1use std::collections::HashMap;
13use std::path::Path;
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::plan::{
18 ExtractionStrategy, PartitionKind, PartitionResult, ReconcileReport, ReconcileSummary,
19 ResolvedRunPlan, build_plan,
20};
21use crate::source;
22use crate::state::{ChunkTaskInfo, StateStore};
23
24use super::chunked::build_chunk_query_sql;
25
26pub enum ReconcileOutputFormat {
28 Pretty,
30 Json(Option<String>),
32}
33
34pub fn run_reconcile_command(
35 config_path: &str,
36 export_name: &str,
37 params: Option<&HashMap<String, String>>,
38 format: ReconcileOutputFormat,
39) -> Result<()> {
40 let config = Config::load_with_params(config_path, params)?;
41 let config_dir = Path::new(config_path)
42 .parent()
43 .unwrap_or_else(|| Path::new("."));
44
45 let export = config
46 .exports
47 .iter()
48 .find(|e| e.name == export_name)
49 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
50
51 let plan = build_plan(&config, export, config_dir, false, false, false, params)?;
52
53 let state_path = config_dir.join(".rivet_state.db");
54 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
55
56 let report = match &plan.strategy {
57 ExtractionStrategy::Chunked(_) => reconcile_chunked(&plan, &state, export)?,
58 ExtractionStrategy::TimeWindow { .. } => {
59 anyhow::bail!(
60 "reconcile: time-window mode is not supported in v1 (Epic F). \
61 Convert to chunked mode with `chunk_by_days` for partition-level reconcile."
62 );
63 }
64 ExtractionStrategy::Snapshot
65 | ExtractionStrategy::Incremental(_)
66 | ExtractionStrategy::Keyset(_) => {
67 anyhow::bail!(
68 "reconcile: '{}' mode has no natural partitions — use `rivet run --reconcile` for a whole-export count check",
69 plan.strategy.mode_label()
70 );
71 }
72 };
73
74 emit_report(&report, &format)?;
75 enforce_reconcile_exit(&report.summary)
76}
77
78fn enforce_reconcile_exit(summary: &ReconcileSummary) -> Result<()> {
94 if summary.unknown > 0 {
95 log::warn!(
96 "reconcile: {} of {} partition(s) could not be verified (incomplete chunk or \
97 non-integer keyset key — no source re-count); not counted as a mismatch",
98 summary.unknown,
99 summary.total_partitions
100 );
101 }
102 if summary.mismatches > 0 {
103 return Err(crate::error::DataIntegrityError::new(format!(
108 "reconcile: {} of {} partition(s) disagree with the source — see the report above",
109 summary.mismatches, summary.total_partitions
110 ))
111 .into());
112 }
113 Ok(())
114}
115
116pub(crate) fn reconcile_chunked_fresh(
120 plan: &ResolvedRunPlan,
121 state: &StateStore,
122) -> Result<ReconcileReport> {
123 reconcile_chunked_inner(plan, state)
124}
125
126fn reconcile_chunked(
127 plan: &ResolvedRunPlan,
128 state: &StateStore,
129 _export: &ExportConfig,
130) -> Result<ReconcileReport> {
131 reconcile_chunked_inner(plan, state)
132}
133
134fn reconcile_chunked_inner(plan: &ResolvedRunPlan, state: &StateStore) -> Result<ReconcileReport> {
135 let (run_id, _plan_hash, _status, _updated) = state
136 .get_latest_chunk_run(&plan.export_name)?
137 .ok_or_else(|| {
138 anyhow::anyhow!(
139 "reconcile: no chunk run recorded for export '{}'. \
140 Enable `chunk_checkpoint: true` and run the export first.",
141 plan.export_name
142 )
143 })?;
144
145 let tasks = state.list_chunk_tasks_for_run(&run_id)?;
146 if tasks.is_empty() {
147 anyhow::bail!(
148 "reconcile: chunk run '{}' for export '{}' has no tasks",
149 run_id,
150 plan.export_name
151 );
152 }
153
154 let mut src = source::create_source(&plan.source)?;
155 let partitions = reconcile_chunked_tasks(plan, &tasks, |chunk_query| {
156 let count_sql = format!("SELECT COUNT(*) FROM ({chunk_query}) AS _rc");
157 let raw = src.query_scalar(&count_sql)?;
158 Ok(raw.and_then(|s| s.trim().parse::<i64>().ok()))
159 })?;
160
161 let report = ReconcileReport::new(
162 plan.export_name.clone(),
163 run_id.clone(),
164 plan.strategy.mode_label().to_string(),
165 partitions,
166 );
167
168 if report.summary.mismatches == 0 && report.summary.unknown == 0 {
170 let highest = tasks
171 .iter()
172 .filter(|t| t.status == "completed")
173 .map(|t| t.chunk_index)
174 .max();
175 if let Some(idx) = highest
176 && let Err(e) = state.record_verified_chunked(&plan.export_name, idx, &run_id)
177 {
178 log::warn!(
179 "export '{}': verified boundary update failed: {:#}",
180 plan.export_name,
181 e
182 );
183 }
184 }
185
186 Ok(report)
187}
188
189pub(crate) fn reconcile_chunked_tasks<F>(
194 plan: &ResolvedRunPlan,
195 tasks: &[ChunkTaskInfo],
196 mut count_source: F,
197) -> Result<Vec<PartitionResult>>
198where
199 F: FnMut(&str) -> Result<Option<i64>>,
200{
201 let cp = match &plan.strategy {
202 ExtractionStrategy::Chunked(cp) => cp,
203 _ => anyhow::bail!("reconcile_chunked_tasks requires Chunked strategy"),
204 };
205
206 let mut out: Vec<PartitionResult> = Vec::with_capacity(tasks.len());
207 for t in tasks {
208 let exported = if t.status == "completed" {
209 t.rows_written
210 } else {
211 None
212 };
213
214 let (start, end) = match (t.start_key.parse::<i64>(), t.end_key.parse::<i64>()) {
215 (Ok(s), Ok(e)) => (s, e),
216 _ => {
217 out.push(PartitionResult::classify(
219 PartitionKind::Chunk,
220 format!("chunk {} [{}..{}]", t.chunk_index, t.start_key, t.end_key),
221 None,
222 exported,
223 ));
224 continue;
225 }
226 };
227
228 let chunk_query = build_chunk_query_sql(
229 &plan.base_query,
230 &cp.column,
231 start,
232 end,
233 cp.dense,
234 cp.by_days.is_some(),
235 plan.source.source_type,
236 );
237 let source_count = count_source(&chunk_query)?;
238
239 out.push(PartitionResult::classify(
240 PartitionKind::Chunk,
241 format!("chunk {} [{}..{}]", t.chunk_index, start, end),
242 source_count,
243 exported,
244 ));
245 }
246 Ok(out)
247}
248
249fn emit_report(report: &ReconcileReport, format: &ReconcileOutputFormat) -> Result<()> {
250 match format {
251 ReconcileOutputFormat::Pretty => {
252 print_report_pretty(report);
253 }
254 ReconcileOutputFormat::Json(None) => {
255 println!("{}", report.to_json_pretty()?);
256 }
257 ReconcileOutputFormat::Json(Some(path)) => {
258 let json = report.to_json_pretty()?;
259 std::fs::write(path, &json)
260 .map_err(|e| anyhow::anyhow!("cannot write reconcile report '{}': {}", path, e))?;
261 println!("Reconcile report written to: {}", path);
262 }
263 }
264 Ok(())
265}
266
267fn print_report_pretty(report: &ReconcileReport) {
268 println!();
269 println!(" Export : {}", report.export_name);
270 println!(" Run : {}", report.run_id);
271 println!(" Strategy : {}", report.strategy);
272 println!(
273 " Partitions: {} ({} match, {} mismatch, {} unknown)",
274 report.summary.total_partitions,
275 report.summary.matches,
276 report.summary.mismatches,
277 report.summary.unknown,
278 );
279 println!(
280 " Rows : source {} / exported {}",
281 report.summary.total_source_rows, report.summary.total_exported_rows,
282 );
283
284 let repair = report.repair_candidates();
285 if repair.is_empty() {
286 println!(" Status : all partitions match");
287 } else {
288 println!(" Repair candidates:");
289 for p in repair {
290 println!(" • {} — {}", p.identifier, format_status_note(p));
291 }
292 }
293 println!();
294}
295
296fn format_status_note(p: &PartitionResult) -> String {
297 let s = match (p.source_count, p.exported_count) {
298 (Some(s), Some(e)) => format!("source={s}, exported={e}"),
299 (Some(s), None) => format!("source={s}, exported=n/a"),
300 (None, Some(e)) => format!("source=n/a, exported={e}"),
301 (None, None) => "no counts".to_string(),
302 };
303 if p.note.is_empty() {
304 s
305 } else {
306 format!("{s} ({})", p.note)
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use crate::config::{
314 CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
315 SourceType,
316 };
317 use crate::plan::{ChunkedPlan, ExtractionStrategy};
318 use crate::state::ChunkTaskInfo;
319 use crate::tuning::SourceTuning;
320
321 fn chunked_plan() -> ResolvedRunPlan {
322 ResolvedRunPlan {
323 export_name: "orders".into(),
324 base_query: "SELECT * FROM orders".into(),
325 strategy: ExtractionStrategy::Chunked(ChunkedPlan {
326 column: "id".into(),
327 chunk_size: 100,
328 chunk_count: None,
329 parallel: 1,
330 dense: false,
331 by_days: None,
332 checkpoint: true,
333 max_attempts: 3,
334 }),
335 format: FormatType::Parquet,
336 compression: CompressionType::Zstd,
337 compression_level: None,
338 max_file_size_bytes: None,
339 skip_empty: false,
340 meta_columns: MetaColumns::default(),
341 destination: DestinationConfig {
342 destination_type: DestinationType::Local,
343 path: Some("./out".into()),
344 ..Default::default()
345 },
346 quality: None,
347 tuning: SourceTuning::from_config(None),
348 tuning_profile_label: "balanced (default)".into(),
349 validate: false,
350 reconcile: false,
351 resume: false,
352 source: SourceConfig {
353 source_type: SourceType::Postgres,
354 url: Some("postgresql://localhost/test".into()),
355 url_env: None,
356 url_file: None,
357 host: None,
358 port: None,
359 user: None,
360 password: None,
361 password_env: None,
362 database: None,
363 environment: None,
364 tuning: None,
365 tls: None,
366 },
367 column_overrides: Default::default(),
368 verify: crate::config::VerifyMode::Size,
369 schema_drift_policy: Default::default(),
370 shape_drift_warn_factor: 2.0,
371 parquet: None,
372 }
373 }
374
375 fn task(idx: i64, start: &str, end: &str, status: &str, rows: Option<i64>) -> ChunkTaskInfo {
376 ChunkTaskInfo {
377 chunk_index: idx,
378 start_key: start.into(),
379 end_key: end.into(),
380 status: status.into(),
381 attempts: 1,
382 last_error: None,
383 rows_written: rows,
384 file_name: None,
385 }
386 }
387
388 #[test]
389 fn matches_and_mismatches_are_classified() {
390 let plan = chunked_plan();
391 let tasks = vec![
392 task(0, "1", "100", "completed", Some(42)),
393 task(1, "101", "200", "completed", Some(30)),
394 ];
395 let mut n = 0;
397 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
398 n += 1;
399 Ok(Some(if n == 1 { 42 } else { 33 }))
400 })
401 .unwrap();
402
403 assert_eq!(parts.len(), 2);
404 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Match);
405 assert_eq!(parts[1].status, crate::plan::PartitionStatus::Mismatch);
406 assert_eq!(parts[1].source_count, Some(33));
407 assert_eq!(parts[1].exported_count, Some(30));
408 }
409
410 #[test]
411 fn unfinished_task_is_unknown_and_does_not_hide_source_count() {
412 let plan = chunked_plan();
413 let tasks = vec![task(0, "1", "100", "failed", None)];
414 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| Ok(Some(42))).unwrap();
415 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
416 assert_eq!(parts[0].source_count, Some(42));
417 assert_eq!(parts[0].exported_count, None);
418 }
419
420 #[test]
421 fn unparseable_chunk_keys_are_unknown_without_source_lookup() {
422 let plan = chunked_plan();
423 let tasks = vec![task(0, "alpha", "omega", "completed", Some(5))];
424 let mut called = false;
425 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
426 called = true;
427 Ok(Some(99))
428 })
429 .unwrap();
430 assert!(
431 !called,
432 "reconcile must skip source count for unparseable chunk keys"
433 );
434 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
435 assert_eq!(parts[0].exported_count, Some(5));
436 }
437
438 #[test]
439 fn chunk_query_passes_through_chunked_math() {
440 let plan = chunked_plan();
441 let tasks = vec![task(0, "10", "20", "completed", Some(5))];
442 let mut captured = String::new();
443 reconcile_chunked_tasks(&plan, &tasks, |q| {
444 captured = q.to_string();
445 Ok(Some(5))
446 })
447 .unwrap();
448 assert!(captured.contains("BETWEEN 10 AND 20"), "got: {captured}");
450 assert!(
451 captured.contains("\"id\""),
452 "identifier must be quoted: {captured}"
453 );
454 }
455
456 fn summary(matches: usize, mismatches: usize, unknown: usize) -> ReconcileSummary {
459 ReconcileSummary {
460 total_partitions: matches + mismatches + unknown,
461 matches,
462 mismatches,
463 unknown,
464 total_source_rows: 0,
465 total_exported_rows: 0,
466 }
467 }
468
469 #[test]
470 fn reconcile_exit_fails_on_mismatch() {
471 let err = enforce_reconcile_exit(&summary(3, 1, 0)).unwrap_err();
475 assert!(
476 err.to_string().contains("disagree with the source"),
477 "got: {err}"
478 );
479 assert_eq!(
482 crate::error::classify_exit(&err),
483 3,
484 "a reconcile mismatch must classify as data-integrity (exit 3)"
485 );
486 }
487
488 #[test]
489 fn reconcile_exit_passes_when_all_match() {
490 assert!(enforce_reconcile_exit(&summary(4, 0, 0)).is_ok());
491 }
492
493 #[test]
494 fn reconcile_exit_does_not_fail_on_unknown_only() {
495 assert!(enforce_reconcile_exit(&summary(2, 0, 3)).is_ok());
499 }
500
501 #[test]
502 fn reconcile_exit_fails_when_mismatch_and_unknown_coexist() {
503 assert!(enforce_reconcile_exit(&summary(0, 1, 2)).is_err());
505 }
506}