Skip to main content

rivet/pipeline/
reconcile_cmd.rs

1//! `rivet reconcile` — partition/window reconciliation (Epic F).
2//!
3//! Re-runs per-partition `COUNT(*)` against the source for a chunked export and
4//! compares the result with the stored per-chunk row counts. Produces a
5//! [`ReconcileReport`] — matches, mismatches, and repair candidates.
6//!
7//! v1 supports **chunked exports with `chunk_checkpoint: true`** (so per-chunk
8//! row counts and ranges are persisted). Other modes return a clear
9//! "not supported in v1" error — reconcile semantics for snapshot / incremental
10//! / time-window differ (see roadmap: Epic F & Epic G).
11
12use std::collections::HashMap;
13use std::path::Path;
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::plan::{
18    ExtractionStrategy, PartitionKind, PartitionResult, ReconcileReport, ResolvedRunPlan,
19    build_plan,
20};
21use crate::source;
22use crate::state::{ChunkTaskInfo, StateStore};
23
24use super::chunked::build_chunk_query_sql;
25
26/// Output format for the reconcile report.
27pub enum ReconcileOutputFormat {
28    /// Human-readable summary printed to stdout.
29    Pretty,
30    /// Pretty-printed JSON written to the given path (or stdout if `None`).
31    Json(Option<String>),
32}
33
34pub fn run_reconcile_command(
35    config_path: &str,
36    export_name: &str,
37    params: Option<&HashMap<String, String>>,
38    format: ReconcileOutputFormat,
39) -> Result<()> {
40    let config = Config::load_with_params(config_path, params)?;
41    let config_dir = Path::new(config_path)
42        .parent()
43        .unwrap_or_else(|| Path::new("."));
44
45    let export = config
46        .exports
47        .iter()
48        .find(|e| e.name == export_name)
49        .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
50
51    let plan = build_plan(&config, export, config_dir, false, false, false, params)?;
52
53    let state_path = config_dir.join(".rivet_state.db");
54    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
55
56    let report = match &plan.strategy {
57        ExtractionStrategy::Chunked(_) => reconcile_chunked(&plan, &state, export)?,
58        ExtractionStrategy::TimeWindow { .. } => {
59            anyhow::bail!(
60                "reconcile: time-window mode is not supported in v1 (Epic F). \
61                 Convert to chunked mode with `chunk_by_days` for partition-level reconcile."
62            );
63        }
64        ExtractionStrategy::Snapshot | ExtractionStrategy::Incremental(_) => {
65            anyhow::bail!(
66                "reconcile: '{}' mode has no natural partitions — use `rivet run --reconcile` for a whole-export count check",
67                plan.strategy.mode_label()
68            );
69        }
70    };
71
72    emit_report(&report, &format)?;
73    Ok(())
74}
75
76/// Run a reconcile pass against the latest chunk run and return the report.
77/// Exposed so `rivet repair --auto` can build a repair plan from a fresh reconcile
78/// without duplicating the logic.
79pub(crate) fn reconcile_chunked_fresh(
80    plan: &ResolvedRunPlan,
81    state: &StateStore,
82) -> Result<ReconcileReport> {
83    reconcile_chunked_inner(plan, state)
84}
85
86fn reconcile_chunked(
87    plan: &ResolvedRunPlan,
88    state: &StateStore,
89    _export: &ExportConfig,
90) -> Result<ReconcileReport> {
91    reconcile_chunked_inner(plan, state)
92}
93
94fn reconcile_chunked_inner(plan: &ResolvedRunPlan, state: &StateStore) -> Result<ReconcileReport> {
95    let (run_id, _plan_hash, _status, _updated) = state
96        .get_latest_chunk_run(&plan.export_name)?
97        .ok_or_else(|| {
98            anyhow::anyhow!(
99                "reconcile: no chunk run recorded for export '{}'. \
100                 Enable `chunk_checkpoint: true` and run the export first.",
101                plan.export_name
102            )
103        })?;
104
105    let tasks = state.list_chunk_tasks_for_run(&run_id)?;
106    if tasks.is_empty() {
107        anyhow::bail!(
108            "reconcile: chunk run '{}' for export '{}' has no tasks",
109            run_id,
110            plan.export_name
111        );
112    }
113
114    let mut src = source::create_source(&plan.source)?;
115    let partitions = reconcile_chunked_tasks(plan, &tasks, |chunk_query| {
116        let count_sql = format!("SELECT COUNT(*) FROM ({chunk_query}) AS _rc");
117        let raw = src.query_scalar(&count_sql)?;
118        Ok(raw.and_then(|s| s.trim().parse::<i64>().ok()))
119    })?;
120
121    let report = ReconcileReport::new(
122        plan.export_name.clone(),
123        run_id.clone(),
124        plan.strategy.mode_label().to_string(),
125        partitions,
126    );
127
128    // Epic G: a reconcile with no mismatches and no unknowns is a fresh verified boundary.
129    if report.summary.mismatches == 0 && report.summary.unknown == 0 {
130        let highest = tasks
131            .iter()
132            .filter(|t| t.status == "completed")
133            .map(|t| t.chunk_index)
134            .max();
135        if let Some(idx) = highest
136            && let Err(e) = state.record_verified_chunked(&plan.export_name, idx, &run_id)
137        {
138            log::warn!(
139                "export '{}': verified boundary update failed: {:#}",
140                plan.export_name,
141                e
142            );
143        }
144    }
145
146    Ok(report)
147}
148
149/// Pure classification of `ChunkTaskInfo`s into `PartitionResult`s.
150///
151/// Abstracts the source round-trip via `count_source` so the logic can be
152/// exercised in unit tests without a live database.
153pub(crate) fn reconcile_chunked_tasks<F>(
154    plan: &ResolvedRunPlan,
155    tasks: &[ChunkTaskInfo],
156    mut count_source: F,
157) -> Result<Vec<PartitionResult>>
158where
159    F: FnMut(&str) -> Result<Option<i64>>,
160{
161    let cp = match &plan.strategy {
162        ExtractionStrategy::Chunked(cp) => cp,
163        _ => anyhow::bail!("reconcile_chunked_tasks requires Chunked strategy"),
164    };
165
166    let mut out: Vec<PartitionResult> = Vec::with_capacity(tasks.len());
167    for t in tasks {
168        let exported = if t.status == "completed" {
169            t.rows_written
170        } else {
171            None
172        };
173
174        let (start, end) = match (t.start_key.parse::<i64>(), t.end_key.parse::<i64>()) {
175            (Ok(s), Ok(e)) => (s, e),
176            _ => {
177                // Unparseable chunk keys: keep exported count but cannot re-count source.
178                out.push(PartitionResult::classify(
179                    PartitionKind::Chunk,
180                    format!("chunk {} [{}..{}]", t.chunk_index, t.start_key, t.end_key),
181                    None,
182                    exported,
183                ));
184                continue;
185            }
186        };
187
188        let chunk_query = build_chunk_query_sql(
189            &plan.base_query,
190            &cp.column,
191            start,
192            end,
193            cp.dense,
194            cp.by_days.is_some(),
195            plan.source.source_type,
196        );
197        let source_count = count_source(&chunk_query)?;
198
199        out.push(PartitionResult::classify(
200            PartitionKind::Chunk,
201            format!("chunk {} [{}..{}]", t.chunk_index, start, end),
202            source_count,
203            exported,
204        ));
205    }
206    Ok(out)
207}
208
209fn emit_report(report: &ReconcileReport, format: &ReconcileOutputFormat) -> Result<()> {
210    match format {
211        ReconcileOutputFormat::Pretty => {
212            print_report_pretty(report);
213        }
214        ReconcileOutputFormat::Json(None) => {
215            println!("{}", report.to_json_pretty()?);
216        }
217        ReconcileOutputFormat::Json(Some(path)) => {
218            let json = report.to_json_pretty()?;
219            std::fs::write(path, &json)
220                .map_err(|e| anyhow::anyhow!("cannot write reconcile report '{}': {}", path, e))?;
221            println!("Reconcile report written to: {}", path);
222        }
223    }
224    Ok(())
225}
226
227fn print_report_pretty(report: &ReconcileReport) {
228    println!();
229    println!("  Export    : {}", report.export_name);
230    println!("  Run       : {}", report.run_id);
231    println!("  Strategy  : {}", report.strategy);
232    println!(
233        "  Partitions: {} ({} match, {} mismatch, {} unknown)",
234        report.summary.total_partitions,
235        report.summary.matches,
236        report.summary.mismatches,
237        report.summary.unknown,
238    );
239    println!(
240        "  Rows      : source {} / exported {}",
241        report.summary.total_source_rows, report.summary.total_exported_rows,
242    );
243
244    let repair = report.repair_candidates();
245    if repair.is_empty() {
246        println!("  Status    : all partitions match");
247    } else {
248        println!("  Repair candidates:");
249        for p in repair {
250            println!("    • {} — {}", p.identifier, format_status_note(p));
251        }
252    }
253    println!();
254}
255
256fn format_status_note(p: &PartitionResult) -> String {
257    let s = match (p.source_count, p.exported_count) {
258        (Some(s), Some(e)) => format!("source={s}, exported={e}"),
259        (Some(s), None) => format!("source={s}, exported=n/a"),
260        (None, Some(e)) => format!("source=n/a, exported={e}"),
261        (None, None) => "no counts".to_string(),
262    };
263    if p.note.is_empty() {
264        s
265    } else {
266        format!("{s} ({})", p.note)
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::config::{
274        CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
275        SourceType,
276    };
277    use crate::plan::{ChunkedPlan, ExtractionStrategy};
278    use crate::state::ChunkTaskInfo;
279    use crate::tuning::SourceTuning;
280
281    fn chunked_plan() -> ResolvedRunPlan {
282        ResolvedRunPlan {
283            export_name: "orders".into(),
284            base_query: "SELECT * FROM orders".into(),
285            strategy: ExtractionStrategy::Chunked(ChunkedPlan {
286                column: "id".into(),
287                chunk_size: 100,
288                chunk_count: None,
289                parallel: 1,
290                dense: false,
291                by_days: None,
292                checkpoint: true,
293                max_attempts: 3,
294            }),
295            format: FormatType::Parquet,
296            compression: CompressionType::Zstd,
297            compression_level: None,
298            max_file_size_bytes: None,
299            skip_empty: false,
300            meta_columns: MetaColumns::default(),
301            destination: DestinationConfig {
302                destination_type: DestinationType::Local,
303                path: Some("./out".into()),
304                ..Default::default()
305            },
306            quality: None,
307            tuning: SourceTuning::from_config(None),
308            tuning_profile_label: "balanced (default)".into(),
309            validate: false,
310            reconcile: false,
311            resume: false,
312            source: SourceConfig {
313                source_type: SourceType::Postgres,
314                url: Some("postgresql://localhost/test".into()),
315                url_env: None,
316                url_file: None,
317                host: None,
318                port: None,
319                user: None,
320                password: None,
321                password_env: None,
322                database: None,
323                environment: None,
324                tuning: None,
325                tls: None,
326            },
327            column_overrides: Default::default(),
328            schema_drift_policy: Default::default(),
329            shape_drift_warn_factor: 2.0,
330            parquet: None,
331        }
332    }
333
334    fn task(idx: i64, start: &str, end: &str, status: &str, rows: Option<i64>) -> ChunkTaskInfo {
335        ChunkTaskInfo {
336            chunk_index: idx,
337            start_key: start.into(),
338            end_key: end.into(),
339            status: status.into(),
340            attempts: 1,
341            last_error: None,
342            rows_written: rows,
343            file_name: None,
344        }
345    }
346
347    #[test]
348    fn matches_and_mismatches_are_classified() {
349        let plan = chunked_plan();
350        let tasks = vec![
351            task(0, "1", "100", "completed", Some(42)),
352            task(1, "101", "200", "completed", Some(30)),
353        ];
354        // Stub source: chunk 0 matches, chunk 1 undercounts on export side.
355        let mut n = 0;
356        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
357            n += 1;
358            Ok(Some(if n == 1 { 42 } else { 33 }))
359        })
360        .unwrap();
361
362        assert_eq!(parts.len(), 2);
363        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Match);
364        assert_eq!(parts[1].status, crate::plan::PartitionStatus::Mismatch);
365        assert_eq!(parts[1].source_count, Some(33));
366        assert_eq!(parts[1].exported_count, Some(30));
367    }
368
369    #[test]
370    fn unfinished_task_is_unknown_and_does_not_hide_source_count() {
371        let plan = chunked_plan();
372        let tasks = vec![task(0, "1", "100", "failed", None)];
373        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| Ok(Some(42))).unwrap();
374        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
375        assert_eq!(parts[0].source_count, Some(42));
376        assert_eq!(parts[0].exported_count, None);
377    }
378
379    #[test]
380    fn unparseable_chunk_keys_are_unknown_without_source_lookup() {
381        let plan = chunked_plan();
382        let tasks = vec![task(0, "alpha", "omega", "completed", Some(5))];
383        let mut called = false;
384        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
385            called = true;
386            Ok(Some(99))
387        })
388        .unwrap();
389        assert!(
390            !called,
391            "reconcile must skip source count for unparseable chunk keys"
392        );
393        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
394        assert_eq!(parts[0].exported_count, Some(5));
395    }
396
397    #[test]
398    fn chunk_query_passes_through_chunked_math() {
399        let plan = chunked_plan();
400        let tasks = vec![task(0, "10", "20", "completed", Some(5))];
401        let mut captured = String::new();
402        reconcile_chunked_tasks(&plan, &tasks, |q| {
403            captured = q.to_string();
404            Ok(Some(5))
405        })
406        .unwrap();
407        // Must reuse the same WHERE predicate used during extraction (ADR-0001 shape).
408        assert!(captured.contains("BETWEEN 10 AND 20"), "got: {captured}");
409        assert!(
410            captured.contains("\"id\""),
411            "identifier must be quoted: {captured}"
412        );
413    }
414}