Skip to main content

rivet/pipeline/
reconcile_cmd.rs

1//! `rivet reconcile` — partition/window reconciliation (Epic F).
2//!
3//! Re-runs per-partition `COUNT(*)` against the source for a chunked export and
4//! compares the result with the stored per-chunk row counts. Produces a
5//! [`ReconcileReport`] — matches, mismatches, and repair candidates.
6//!
7//! v1 supports **chunked exports with `chunk_checkpoint: true`** (so per-chunk
8//! row counts and ranges are persisted). Other modes return a clear
9//! "not supported in v1" error — reconcile semantics for snapshot / incremental
10//! / time-window differ (see roadmap: Epic F & Epic G).
11
12use std::collections::HashMap;
13use std::path::Path;
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::plan::{
18    ExtractionStrategy, PartitionKind, PartitionResult, ReconcileReport, ResolvedRunPlan,
19    build_plan,
20};
21use crate::source;
22use crate::state::{ChunkTaskInfo, StateStore};
23
24use super::chunked::build_chunk_query_sql;
25
26/// Output format for the reconcile report.
27pub enum ReconcileOutputFormat {
28    /// Human-readable summary printed to stdout.
29    Pretty,
30    /// Pretty-printed JSON written to the given path (or stdout if `None`).
31    Json(Option<String>),
32}
33
34pub fn run_reconcile_command(
35    config_path: &str,
36    export_name: &str,
37    params: Option<&HashMap<String, String>>,
38    format: ReconcileOutputFormat,
39) -> Result<()> {
40    let config = Config::load_with_params(config_path, params)?;
41    let config_dir = Path::new(config_path)
42        .parent()
43        .unwrap_or_else(|| Path::new("."));
44
45    let export = config
46        .exports
47        .iter()
48        .find(|e| e.name == export_name)
49        .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
50
51    let plan = build_plan(&config, export, config_dir, false, false, false, params)?;
52
53    let state_path = config_dir.join(".rivet_state.db");
54    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
55
56    let report = match &plan.strategy {
57        ExtractionStrategy::Chunked(_) => reconcile_chunked(&plan, &state, export)?,
58        ExtractionStrategy::TimeWindow { .. } => {
59            anyhow::bail!(
60                "reconcile: time-window mode is not supported in v1 (Epic F). \
61                 Convert to chunked mode with `chunk_by_days` for partition-level reconcile."
62            );
63        }
64        ExtractionStrategy::Snapshot
65        | ExtractionStrategy::Incremental(_)
66        | ExtractionStrategy::Keyset(_) => {
67            anyhow::bail!(
68                "reconcile: '{}' mode has no natural partitions — use `rivet run --reconcile` for a whole-export count check",
69                plan.strategy.mode_label()
70            );
71        }
72    };
73
74    emit_report(&report, &format)?;
75    Ok(())
76}
77
78/// Run a reconcile pass against the latest chunk run and return the report.
79/// Exposed so `rivet repair --auto` can build a repair plan from a fresh reconcile
80/// without duplicating the logic.
81pub(crate) fn reconcile_chunked_fresh(
82    plan: &ResolvedRunPlan,
83    state: &StateStore,
84) -> Result<ReconcileReport> {
85    reconcile_chunked_inner(plan, state)
86}
87
88fn reconcile_chunked(
89    plan: &ResolvedRunPlan,
90    state: &StateStore,
91    _export: &ExportConfig,
92) -> Result<ReconcileReport> {
93    reconcile_chunked_inner(plan, state)
94}
95
96fn reconcile_chunked_inner(plan: &ResolvedRunPlan, state: &StateStore) -> Result<ReconcileReport> {
97    let (run_id, _plan_hash, _status, _updated) = state
98        .get_latest_chunk_run(&plan.export_name)?
99        .ok_or_else(|| {
100            anyhow::anyhow!(
101                "reconcile: no chunk run recorded for export '{}'. \
102                 Enable `chunk_checkpoint: true` and run the export first.",
103                plan.export_name
104            )
105        })?;
106
107    let tasks = state.list_chunk_tasks_for_run(&run_id)?;
108    if tasks.is_empty() {
109        anyhow::bail!(
110            "reconcile: chunk run '{}' for export '{}' has no tasks",
111            run_id,
112            plan.export_name
113        );
114    }
115
116    let mut src = source::create_source(&plan.source)?;
117    let partitions = reconcile_chunked_tasks(plan, &tasks, |chunk_query| {
118        let count_sql = format!("SELECT COUNT(*) FROM ({chunk_query}) AS _rc");
119        let raw = src.query_scalar(&count_sql)?;
120        Ok(raw.and_then(|s| s.trim().parse::<i64>().ok()))
121    })?;
122
123    let report = ReconcileReport::new(
124        plan.export_name.clone(),
125        run_id.clone(),
126        plan.strategy.mode_label().to_string(),
127        partitions,
128    );
129
130    // Epic G: a reconcile with no mismatches and no unknowns is a fresh verified boundary.
131    if report.summary.mismatches == 0 && report.summary.unknown == 0 {
132        let highest = tasks
133            .iter()
134            .filter(|t| t.status == "completed")
135            .map(|t| t.chunk_index)
136            .max();
137        if let Some(idx) = highest
138            && let Err(e) = state.record_verified_chunked(&plan.export_name, idx, &run_id)
139        {
140            log::warn!(
141                "export '{}': verified boundary update failed: {:#}",
142                plan.export_name,
143                e
144            );
145        }
146    }
147
148    Ok(report)
149}
150
151/// Pure classification of `ChunkTaskInfo`s into `PartitionResult`s.
152///
153/// Abstracts the source round-trip via `count_source` so the logic can be
154/// exercised in unit tests without a live database.
155pub(crate) fn reconcile_chunked_tasks<F>(
156    plan: &ResolvedRunPlan,
157    tasks: &[ChunkTaskInfo],
158    mut count_source: F,
159) -> Result<Vec<PartitionResult>>
160where
161    F: FnMut(&str) -> Result<Option<i64>>,
162{
163    let cp = match &plan.strategy {
164        ExtractionStrategy::Chunked(cp) => cp,
165        _ => anyhow::bail!("reconcile_chunked_tasks requires Chunked strategy"),
166    };
167
168    let mut out: Vec<PartitionResult> = Vec::with_capacity(tasks.len());
169    for t in tasks {
170        let exported = if t.status == "completed" {
171            t.rows_written
172        } else {
173            None
174        };
175
176        let (start, end) = match (t.start_key.parse::<i64>(), t.end_key.parse::<i64>()) {
177            (Ok(s), Ok(e)) => (s, e),
178            _ => {
179                // Unparseable chunk keys: keep exported count but cannot re-count source.
180                out.push(PartitionResult::classify(
181                    PartitionKind::Chunk,
182                    format!("chunk {} [{}..{}]", t.chunk_index, t.start_key, t.end_key),
183                    None,
184                    exported,
185                ));
186                continue;
187            }
188        };
189
190        let chunk_query = build_chunk_query_sql(
191            &plan.base_query,
192            &cp.column,
193            start,
194            end,
195            cp.dense,
196            cp.by_days.is_some(),
197            plan.source.source_type,
198        );
199        let source_count = count_source(&chunk_query)?;
200
201        out.push(PartitionResult::classify(
202            PartitionKind::Chunk,
203            format!("chunk {} [{}..{}]", t.chunk_index, start, end),
204            source_count,
205            exported,
206        ));
207    }
208    Ok(out)
209}
210
211fn emit_report(report: &ReconcileReport, format: &ReconcileOutputFormat) -> Result<()> {
212    match format {
213        ReconcileOutputFormat::Pretty => {
214            print_report_pretty(report);
215        }
216        ReconcileOutputFormat::Json(None) => {
217            println!("{}", report.to_json_pretty()?);
218        }
219        ReconcileOutputFormat::Json(Some(path)) => {
220            let json = report.to_json_pretty()?;
221            std::fs::write(path, &json)
222                .map_err(|e| anyhow::anyhow!("cannot write reconcile report '{}': {}", path, e))?;
223            println!("Reconcile report written to: {}", path);
224        }
225    }
226    Ok(())
227}
228
229fn print_report_pretty(report: &ReconcileReport) {
230    println!();
231    println!("  Export    : {}", report.export_name);
232    println!("  Run       : {}", report.run_id);
233    println!("  Strategy  : {}", report.strategy);
234    println!(
235        "  Partitions: {} ({} match, {} mismatch, {} unknown)",
236        report.summary.total_partitions,
237        report.summary.matches,
238        report.summary.mismatches,
239        report.summary.unknown,
240    );
241    println!(
242        "  Rows      : source {} / exported {}",
243        report.summary.total_source_rows, report.summary.total_exported_rows,
244    );
245
246    let repair = report.repair_candidates();
247    if repair.is_empty() {
248        println!("  Status    : all partitions match");
249    } else {
250        println!("  Repair candidates:");
251        for p in repair {
252            println!("    • {} — {}", p.identifier, format_status_note(p));
253        }
254    }
255    println!();
256}
257
258fn format_status_note(p: &PartitionResult) -> String {
259    let s = match (p.source_count, p.exported_count) {
260        (Some(s), Some(e)) => format!("source={s}, exported={e}"),
261        (Some(s), None) => format!("source={s}, exported=n/a"),
262        (None, Some(e)) => format!("source=n/a, exported={e}"),
263        (None, None) => "no counts".to_string(),
264    };
265    if p.note.is_empty() {
266        s
267    } else {
268        format!("{s} ({})", p.note)
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::config::{
276        CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
277        SourceType,
278    };
279    use crate::plan::{ChunkedPlan, ExtractionStrategy};
280    use crate::state::ChunkTaskInfo;
281    use crate::tuning::SourceTuning;
282
283    fn chunked_plan() -> ResolvedRunPlan {
284        ResolvedRunPlan {
285            export_name: "orders".into(),
286            base_query: "SELECT * FROM orders".into(),
287            strategy: ExtractionStrategy::Chunked(ChunkedPlan {
288                column: "id".into(),
289                chunk_size: 100,
290                chunk_count: None,
291                parallel: 1,
292                dense: false,
293                by_days: None,
294                checkpoint: true,
295                max_attempts: 3,
296            }),
297            format: FormatType::Parquet,
298            compression: CompressionType::Zstd,
299            compression_level: None,
300            max_file_size_bytes: None,
301            skip_empty: false,
302            meta_columns: MetaColumns::default(),
303            destination: DestinationConfig {
304                destination_type: DestinationType::Local,
305                path: Some("./out".into()),
306                ..Default::default()
307            },
308            quality: None,
309            tuning: SourceTuning::from_config(None),
310            tuning_profile_label: "balanced (default)".into(),
311            validate: false,
312            reconcile: false,
313            resume: false,
314            source: SourceConfig {
315                source_type: SourceType::Postgres,
316                url: Some("postgresql://localhost/test".into()),
317                url_env: None,
318                url_file: None,
319                host: None,
320                port: None,
321                user: None,
322                password: None,
323                password_env: None,
324                database: None,
325                environment: None,
326                tuning: None,
327                tls: None,
328            },
329            column_overrides: Default::default(),
330            schema_drift_policy: Default::default(),
331            shape_drift_warn_factor: 2.0,
332            parquet: None,
333        }
334    }
335
336    fn task(idx: i64, start: &str, end: &str, status: &str, rows: Option<i64>) -> ChunkTaskInfo {
337        ChunkTaskInfo {
338            chunk_index: idx,
339            start_key: start.into(),
340            end_key: end.into(),
341            status: status.into(),
342            attempts: 1,
343            last_error: None,
344            rows_written: rows,
345            file_name: None,
346        }
347    }
348
349    #[test]
350    fn matches_and_mismatches_are_classified() {
351        let plan = chunked_plan();
352        let tasks = vec![
353            task(0, "1", "100", "completed", Some(42)),
354            task(1, "101", "200", "completed", Some(30)),
355        ];
356        // Stub source: chunk 0 matches, chunk 1 undercounts on export side.
357        let mut n = 0;
358        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
359            n += 1;
360            Ok(Some(if n == 1 { 42 } else { 33 }))
361        })
362        .unwrap();
363
364        assert_eq!(parts.len(), 2);
365        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Match);
366        assert_eq!(parts[1].status, crate::plan::PartitionStatus::Mismatch);
367        assert_eq!(parts[1].source_count, Some(33));
368        assert_eq!(parts[1].exported_count, Some(30));
369    }
370
371    #[test]
372    fn unfinished_task_is_unknown_and_does_not_hide_source_count() {
373        let plan = chunked_plan();
374        let tasks = vec![task(0, "1", "100", "failed", None)];
375        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| Ok(Some(42))).unwrap();
376        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
377        assert_eq!(parts[0].source_count, Some(42));
378        assert_eq!(parts[0].exported_count, None);
379    }
380
381    #[test]
382    fn unparseable_chunk_keys_are_unknown_without_source_lookup() {
383        let plan = chunked_plan();
384        let tasks = vec![task(0, "alpha", "omega", "completed", Some(5))];
385        let mut called = false;
386        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
387            called = true;
388            Ok(Some(99))
389        })
390        .unwrap();
391        assert!(
392            !called,
393            "reconcile must skip source count for unparseable chunk keys"
394        );
395        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
396        assert_eq!(parts[0].exported_count, Some(5));
397    }
398
399    #[test]
400    fn chunk_query_passes_through_chunked_math() {
401        let plan = chunked_plan();
402        let tasks = vec![task(0, "10", "20", "completed", Some(5))];
403        let mut captured = String::new();
404        reconcile_chunked_tasks(&plan, &tasks, |q| {
405            captured = q.to_string();
406            Ok(Some(5))
407        })
408        .unwrap();
409        // Must reuse the same WHERE predicate used during extraction (ADR-0001 shape).
410        assert!(captured.contains("BETWEEN 10 AND 20"), "got: {captured}");
411        assert!(
412            captured.contains("\"id\""),
413            "identifier must be quoted: {captured}"
414        );
415    }
416}