1use std::collections::HashMap;
13use std::path::Path;
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::plan::{
18 ExtractionStrategy, PartitionKind, PartitionResult, ReconcileReport, ReconcileSummary,
19 ResolvedRunPlan, build_plan,
20};
21use crate::source;
22use crate::state::{ChunkTaskInfo, StateStore};
23
24use super::chunked::build_chunk_query_sql;
25
26pub enum ReconcileOutputFormat {
28 Pretty,
30 Json(Option<String>),
32}
33
34pub fn run_reconcile_command(
35 config_path: &str,
36 export_name: &str,
37 params: Option<&HashMap<String, String>>,
38 format: ReconcileOutputFormat,
39) -> Result<()> {
40 let config = Config::load_with_params(config_path, params)?;
41 let config_dir = Path::new(config_path)
42 .parent()
43 .unwrap_or_else(|| Path::new("."));
44
45 let export = config
46 .exports
47 .iter()
48 .find(|e| e.name == export_name)
49 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
50
51 let plan = build_plan(&config, export, config_dir, false, false, false, params)?;
52
53 let state_path = config_dir.join(".rivet_state.db");
54 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
55
56 let report = match &plan.strategy {
57 ExtractionStrategy::Chunked(_) => reconcile_chunked(&plan, &state, export)?,
58 ExtractionStrategy::TimeWindow { .. } => {
59 anyhow::bail!(
60 "reconcile: time-window mode is not supported in v1 (Epic F). \
61 Convert to chunked mode with `chunk_by_days` for partition-level reconcile."
62 );
63 }
64 ExtractionStrategy::Snapshot
65 | ExtractionStrategy::Incremental(_)
66 | ExtractionStrategy::Keyset(_) => {
67 anyhow::bail!(
68 "reconcile: '{}' mode has no natural partitions — use `rivet run --reconcile` for a whole-export count check",
69 plan.strategy.mode_label()
70 );
71 }
72 };
73
74 emit_report(&report, &format)?;
75 enforce_reconcile_exit(&report.summary)
76}
77
78fn enforce_reconcile_exit(summary: &ReconcileSummary) -> Result<()> {
94 if summary.unknown > 0 {
95 log::warn!(
96 "reconcile: {} of {} partition(s) could not be verified (incomplete chunk or \
97 non-integer keyset key — no source re-count); not counted as a mismatch",
98 summary.unknown,
99 summary.total_partitions
100 );
101 }
102 if summary.mismatches > 0 {
103 anyhow::bail!(
104 "reconcile: {} of {} partition(s) disagree with the source — see the report above",
105 summary.mismatches,
106 summary.total_partitions
107 );
108 }
109 Ok(())
110}
111
112pub(crate) fn reconcile_chunked_fresh(
116 plan: &ResolvedRunPlan,
117 state: &StateStore,
118) -> Result<ReconcileReport> {
119 reconcile_chunked_inner(plan, state)
120}
121
122fn reconcile_chunked(
123 plan: &ResolvedRunPlan,
124 state: &StateStore,
125 _export: &ExportConfig,
126) -> Result<ReconcileReport> {
127 reconcile_chunked_inner(plan, state)
128}
129
130fn reconcile_chunked_inner(plan: &ResolvedRunPlan, state: &StateStore) -> Result<ReconcileReport> {
131 let (run_id, _plan_hash, _status, _updated) = state
132 .get_latest_chunk_run(&plan.export_name)?
133 .ok_or_else(|| {
134 anyhow::anyhow!(
135 "reconcile: no chunk run recorded for export '{}'. \
136 Enable `chunk_checkpoint: true` and run the export first.",
137 plan.export_name
138 )
139 })?;
140
141 let tasks = state.list_chunk_tasks_for_run(&run_id)?;
142 if tasks.is_empty() {
143 anyhow::bail!(
144 "reconcile: chunk run '{}' for export '{}' has no tasks",
145 run_id,
146 plan.export_name
147 );
148 }
149
150 let mut src = source::create_source(&plan.source)?;
151 let partitions = reconcile_chunked_tasks(plan, &tasks, |chunk_query| {
152 let count_sql = format!("SELECT COUNT(*) FROM ({chunk_query}) AS _rc");
153 let raw = src.query_scalar(&count_sql)?;
154 Ok(raw.and_then(|s| s.trim().parse::<i64>().ok()))
155 })?;
156
157 let report = ReconcileReport::new(
158 plan.export_name.clone(),
159 run_id.clone(),
160 plan.strategy.mode_label().to_string(),
161 partitions,
162 );
163
164 if report.summary.mismatches == 0 && report.summary.unknown == 0 {
166 let highest = tasks
167 .iter()
168 .filter(|t| t.status == "completed")
169 .map(|t| t.chunk_index)
170 .max();
171 if let Some(idx) = highest
172 && let Err(e) = state.record_verified_chunked(&plan.export_name, idx, &run_id)
173 {
174 log::warn!(
175 "export '{}': verified boundary update failed: {:#}",
176 plan.export_name,
177 e
178 );
179 }
180 }
181
182 Ok(report)
183}
184
185pub(crate) fn reconcile_chunked_tasks<F>(
190 plan: &ResolvedRunPlan,
191 tasks: &[ChunkTaskInfo],
192 mut count_source: F,
193) -> Result<Vec<PartitionResult>>
194where
195 F: FnMut(&str) -> Result<Option<i64>>,
196{
197 let cp = match &plan.strategy {
198 ExtractionStrategy::Chunked(cp) => cp,
199 _ => anyhow::bail!("reconcile_chunked_tasks requires Chunked strategy"),
200 };
201
202 let mut out: Vec<PartitionResult> = Vec::with_capacity(tasks.len());
203 for t in tasks {
204 let exported = if t.status == "completed" {
205 t.rows_written
206 } else {
207 None
208 };
209
210 let (start, end) = match (t.start_key.parse::<i64>(), t.end_key.parse::<i64>()) {
211 (Ok(s), Ok(e)) => (s, e),
212 _ => {
213 out.push(PartitionResult::classify(
215 PartitionKind::Chunk,
216 format!("chunk {} [{}..{}]", t.chunk_index, t.start_key, t.end_key),
217 None,
218 exported,
219 ));
220 continue;
221 }
222 };
223
224 let chunk_query = build_chunk_query_sql(
225 &plan.base_query,
226 &cp.column,
227 start,
228 end,
229 cp.dense,
230 cp.by_days.is_some(),
231 plan.source.source_type,
232 );
233 let source_count = count_source(&chunk_query)?;
234
235 out.push(PartitionResult::classify(
236 PartitionKind::Chunk,
237 format!("chunk {} [{}..{}]", t.chunk_index, start, end),
238 source_count,
239 exported,
240 ));
241 }
242 Ok(out)
243}
244
245fn emit_report(report: &ReconcileReport, format: &ReconcileOutputFormat) -> Result<()> {
246 match format {
247 ReconcileOutputFormat::Pretty => {
248 print_report_pretty(report);
249 }
250 ReconcileOutputFormat::Json(None) => {
251 println!("{}", report.to_json_pretty()?);
252 }
253 ReconcileOutputFormat::Json(Some(path)) => {
254 let json = report.to_json_pretty()?;
255 std::fs::write(path, &json)
256 .map_err(|e| anyhow::anyhow!("cannot write reconcile report '{}': {}", path, e))?;
257 println!("Reconcile report written to: {}", path);
258 }
259 }
260 Ok(())
261}
262
263fn print_report_pretty(report: &ReconcileReport) {
264 println!();
265 println!(" Export : {}", report.export_name);
266 println!(" Run : {}", report.run_id);
267 println!(" Strategy : {}", report.strategy);
268 println!(
269 " Partitions: {} ({} match, {} mismatch, {} unknown)",
270 report.summary.total_partitions,
271 report.summary.matches,
272 report.summary.mismatches,
273 report.summary.unknown,
274 );
275 println!(
276 " Rows : source {} / exported {}",
277 report.summary.total_source_rows, report.summary.total_exported_rows,
278 );
279
280 let repair = report.repair_candidates();
281 if repair.is_empty() {
282 println!(" Status : all partitions match");
283 } else {
284 println!(" Repair candidates:");
285 for p in repair {
286 println!(" • {} — {}", p.identifier, format_status_note(p));
287 }
288 }
289 println!();
290}
291
292fn format_status_note(p: &PartitionResult) -> String {
293 let s = match (p.source_count, p.exported_count) {
294 (Some(s), Some(e)) => format!("source={s}, exported={e}"),
295 (Some(s), None) => format!("source={s}, exported=n/a"),
296 (None, Some(e)) => format!("source=n/a, exported={e}"),
297 (None, None) => "no counts".to_string(),
298 };
299 if p.note.is_empty() {
300 s
301 } else {
302 format!("{s} ({})", p.note)
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use crate::config::{
310 CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
311 SourceType,
312 };
313 use crate::plan::{ChunkedPlan, ExtractionStrategy};
314 use crate::state::ChunkTaskInfo;
315 use crate::tuning::SourceTuning;
316
317 fn chunked_plan() -> ResolvedRunPlan {
318 ResolvedRunPlan {
319 export_name: "orders".into(),
320 base_query: "SELECT * FROM orders".into(),
321 strategy: ExtractionStrategy::Chunked(ChunkedPlan {
322 column: "id".into(),
323 chunk_size: 100,
324 chunk_count: None,
325 parallel: 1,
326 dense: false,
327 by_days: None,
328 checkpoint: true,
329 max_attempts: 3,
330 }),
331 format: FormatType::Parquet,
332 compression: CompressionType::Zstd,
333 compression_level: None,
334 max_file_size_bytes: None,
335 skip_empty: false,
336 meta_columns: MetaColumns::default(),
337 destination: DestinationConfig {
338 destination_type: DestinationType::Local,
339 path: Some("./out".into()),
340 ..Default::default()
341 },
342 quality: None,
343 tuning: SourceTuning::from_config(None),
344 tuning_profile_label: "balanced (default)".into(),
345 validate: false,
346 reconcile: false,
347 resume: false,
348 source: SourceConfig {
349 source_type: SourceType::Postgres,
350 url: Some("postgresql://localhost/test".into()),
351 url_env: None,
352 url_file: None,
353 host: None,
354 port: None,
355 user: None,
356 password: None,
357 password_env: None,
358 database: None,
359 environment: None,
360 tuning: None,
361 tls: None,
362 },
363 column_overrides: Default::default(),
364 verify: crate::config::VerifyMode::Size,
365 schema_drift_policy: Default::default(),
366 shape_drift_warn_factor: 2.0,
367 parquet: None,
368 }
369 }
370
371 fn task(idx: i64, start: &str, end: &str, status: &str, rows: Option<i64>) -> ChunkTaskInfo {
372 ChunkTaskInfo {
373 chunk_index: idx,
374 start_key: start.into(),
375 end_key: end.into(),
376 status: status.into(),
377 attempts: 1,
378 last_error: None,
379 rows_written: rows,
380 file_name: None,
381 }
382 }
383
384 #[test]
385 fn matches_and_mismatches_are_classified() {
386 let plan = chunked_plan();
387 let tasks = vec![
388 task(0, "1", "100", "completed", Some(42)),
389 task(1, "101", "200", "completed", Some(30)),
390 ];
391 let mut n = 0;
393 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
394 n += 1;
395 Ok(Some(if n == 1 { 42 } else { 33 }))
396 })
397 .unwrap();
398
399 assert_eq!(parts.len(), 2);
400 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Match);
401 assert_eq!(parts[1].status, crate::plan::PartitionStatus::Mismatch);
402 assert_eq!(parts[1].source_count, Some(33));
403 assert_eq!(parts[1].exported_count, Some(30));
404 }
405
406 #[test]
407 fn unfinished_task_is_unknown_and_does_not_hide_source_count() {
408 let plan = chunked_plan();
409 let tasks = vec![task(0, "1", "100", "failed", None)];
410 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| Ok(Some(42))).unwrap();
411 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
412 assert_eq!(parts[0].source_count, Some(42));
413 assert_eq!(parts[0].exported_count, None);
414 }
415
416 #[test]
417 fn unparseable_chunk_keys_are_unknown_without_source_lookup() {
418 let plan = chunked_plan();
419 let tasks = vec![task(0, "alpha", "omega", "completed", Some(5))];
420 let mut called = false;
421 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
422 called = true;
423 Ok(Some(99))
424 })
425 .unwrap();
426 assert!(
427 !called,
428 "reconcile must skip source count for unparseable chunk keys"
429 );
430 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
431 assert_eq!(parts[0].exported_count, Some(5));
432 }
433
434 #[test]
435 fn chunk_query_passes_through_chunked_math() {
436 let plan = chunked_plan();
437 let tasks = vec![task(0, "10", "20", "completed", Some(5))];
438 let mut captured = String::new();
439 reconcile_chunked_tasks(&plan, &tasks, |q| {
440 captured = q.to_string();
441 Ok(Some(5))
442 })
443 .unwrap();
444 assert!(captured.contains("BETWEEN 10 AND 20"), "got: {captured}");
446 assert!(
447 captured.contains("\"id\""),
448 "identifier must be quoted: {captured}"
449 );
450 }
451
452 fn summary(matches: usize, mismatches: usize, unknown: usize) -> ReconcileSummary {
455 ReconcileSummary {
456 total_partitions: matches + mismatches + unknown,
457 matches,
458 mismatches,
459 unknown,
460 total_source_rows: 0,
461 total_exported_rows: 0,
462 }
463 }
464
465 #[test]
466 fn reconcile_exit_fails_on_mismatch() {
467 let err = enforce_reconcile_exit(&summary(3, 1, 0)).unwrap_err();
471 assert!(
472 err.to_string().contains("disagree with the source"),
473 "got: {err}"
474 );
475 }
476
477 #[test]
478 fn reconcile_exit_passes_when_all_match() {
479 assert!(enforce_reconcile_exit(&summary(4, 0, 0)).is_ok());
480 }
481
482 #[test]
483 fn reconcile_exit_does_not_fail_on_unknown_only() {
484 assert!(enforce_reconcile_exit(&summary(2, 0, 3)).is_ok());
488 }
489
490 #[test]
491 fn reconcile_exit_fails_when_mismatch_and_unknown_coexist() {
492 assert!(enforce_reconcile_exit(&summary(0, 1, 2)).is_err());
494 }
495}