1use std::collections::HashMap;
13use std::path::Path;
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::plan::{
18 ExtractionStrategy, PartitionKind, PartitionResult, ReconcileReport, ResolvedRunPlan,
19 build_plan,
20};
21use crate::source;
22use crate::state::{ChunkTaskInfo, StateStore};
23
24use super::chunked::build_chunk_query_sql;
25
26pub enum ReconcileOutputFormat {
28 Pretty,
30 Json(Option<String>),
32}
33
34pub fn run_reconcile_command(
35 config_path: &str,
36 export_name: &str,
37 params: Option<&HashMap<String, String>>,
38 format: ReconcileOutputFormat,
39) -> Result<()> {
40 let config = Config::load_with_params(config_path, params)?;
41 let config_dir = Path::new(config_path)
42 .parent()
43 .unwrap_or_else(|| Path::new("."));
44
45 let export = config
46 .exports
47 .iter()
48 .find(|e| e.name == export_name)
49 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
50
51 let plan = build_plan(&config, export, config_dir, false, false, false, params)?;
52
53 let state_path = config_dir.join(".rivet_state.db");
54 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
55
56 let report = match &plan.strategy {
57 ExtractionStrategy::Chunked(_) => reconcile_chunked(&plan, &state, export)?,
58 ExtractionStrategy::TimeWindow { .. } => {
59 anyhow::bail!(
60 "reconcile: time-window mode is not supported in v1 (Epic F). \
61 Convert to chunked mode with `chunk_by_days` for partition-level reconcile."
62 );
63 }
64 ExtractionStrategy::Snapshot
65 | ExtractionStrategy::Incremental(_)
66 | ExtractionStrategy::Keyset(_) => {
67 anyhow::bail!(
68 "reconcile: '{}' mode has no natural partitions — use `rivet run --reconcile` for a whole-export count check",
69 plan.strategy.mode_label()
70 );
71 }
72 };
73
74 emit_report(&report, &format)?;
75 Ok(())
76}
77
78pub(crate) fn reconcile_chunked_fresh(
82 plan: &ResolvedRunPlan,
83 state: &StateStore,
84) -> Result<ReconcileReport> {
85 reconcile_chunked_inner(plan, state)
86}
87
88fn reconcile_chunked(
89 plan: &ResolvedRunPlan,
90 state: &StateStore,
91 _export: &ExportConfig,
92) -> Result<ReconcileReport> {
93 reconcile_chunked_inner(plan, state)
94}
95
96fn reconcile_chunked_inner(plan: &ResolvedRunPlan, state: &StateStore) -> Result<ReconcileReport> {
97 let (run_id, _plan_hash, _status, _updated) = state
98 .get_latest_chunk_run(&plan.export_name)?
99 .ok_or_else(|| {
100 anyhow::anyhow!(
101 "reconcile: no chunk run recorded for export '{}'. \
102 Enable `chunk_checkpoint: true` and run the export first.",
103 plan.export_name
104 )
105 })?;
106
107 let tasks = state.list_chunk_tasks_for_run(&run_id)?;
108 if tasks.is_empty() {
109 anyhow::bail!(
110 "reconcile: chunk run '{}' for export '{}' has no tasks",
111 run_id,
112 plan.export_name
113 );
114 }
115
116 let mut src = source::create_source(&plan.source)?;
117 let partitions = reconcile_chunked_tasks(plan, &tasks, |chunk_query| {
118 let count_sql = format!("SELECT COUNT(*) FROM ({chunk_query}) AS _rc");
119 let raw = src.query_scalar(&count_sql)?;
120 Ok(raw.and_then(|s| s.trim().parse::<i64>().ok()))
121 })?;
122
123 let report = ReconcileReport::new(
124 plan.export_name.clone(),
125 run_id.clone(),
126 plan.strategy.mode_label().to_string(),
127 partitions,
128 );
129
130 if report.summary.mismatches == 0 && report.summary.unknown == 0 {
132 let highest = tasks
133 .iter()
134 .filter(|t| t.status == "completed")
135 .map(|t| t.chunk_index)
136 .max();
137 if let Some(idx) = highest
138 && let Err(e) = state.record_verified_chunked(&plan.export_name, idx, &run_id)
139 {
140 log::warn!(
141 "export '{}': verified boundary update failed: {:#}",
142 plan.export_name,
143 e
144 );
145 }
146 }
147
148 Ok(report)
149}
150
151pub(crate) fn reconcile_chunked_tasks<F>(
156 plan: &ResolvedRunPlan,
157 tasks: &[ChunkTaskInfo],
158 mut count_source: F,
159) -> Result<Vec<PartitionResult>>
160where
161 F: FnMut(&str) -> Result<Option<i64>>,
162{
163 let cp = match &plan.strategy {
164 ExtractionStrategy::Chunked(cp) => cp,
165 _ => anyhow::bail!("reconcile_chunked_tasks requires Chunked strategy"),
166 };
167
168 let mut out: Vec<PartitionResult> = Vec::with_capacity(tasks.len());
169 for t in tasks {
170 let exported = if t.status == "completed" {
171 t.rows_written
172 } else {
173 None
174 };
175
176 let (start, end) = match (t.start_key.parse::<i64>(), t.end_key.parse::<i64>()) {
177 (Ok(s), Ok(e)) => (s, e),
178 _ => {
179 out.push(PartitionResult::classify(
181 PartitionKind::Chunk,
182 format!("chunk {} [{}..{}]", t.chunk_index, t.start_key, t.end_key),
183 None,
184 exported,
185 ));
186 continue;
187 }
188 };
189
190 let chunk_query = build_chunk_query_sql(
191 &plan.base_query,
192 &cp.column,
193 start,
194 end,
195 cp.dense,
196 cp.by_days.is_some(),
197 plan.source.source_type,
198 );
199 let source_count = count_source(&chunk_query)?;
200
201 out.push(PartitionResult::classify(
202 PartitionKind::Chunk,
203 format!("chunk {} [{}..{}]", t.chunk_index, start, end),
204 source_count,
205 exported,
206 ));
207 }
208 Ok(out)
209}
210
211fn emit_report(report: &ReconcileReport, format: &ReconcileOutputFormat) -> Result<()> {
212 match format {
213 ReconcileOutputFormat::Pretty => {
214 print_report_pretty(report);
215 }
216 ReconcileOutputFormat::Json(None) => {
217 println!("{}", report.to_json_pretty()?);
218 }
219 ReconcileOutputFormat::Json(Some(path)) => {
220 let json = report.to_json_pretty()?;
221 std::fs::write(path, &json)
222 .map_err(|e| anyhow::anyhow!("cannot write reconcile report '{}': {}", path, e))?;
223 println!("Reconcile report written to: {}", path);
224 }
225 }
226 Ok(())
227}
228
229fn print_report_pretty(report: &ReconcileReport) {
230 println!();
231 println!(" Export : {}", report.export_name);
232 println!(" Run : {}", report.run_id);
233 println!(" Strategy : {}", report.strategy);
234 println!(
235 " Partitions: {} ({} match, {} mismatch, {} unknown)",
236 report.summary.total_partitions,
237 report.summary.matches,
238 report.summary.mismatches,
239 report.summary.unknown,
240 );
241 println!(
242 " Rows : source {} / exported {}",
243 report.summary.total_source_rows, report.summary.total_exported_rows,
244 );
245
246 let repair = report.repair_candidates();
247 if repair.is_empty() {
248 println!(" Status : all partitions match");
249 } else {
250 println!(" Repair candidates:");
251 for p in repair {
252 println!(" • {} — {}", p.identifier, format_status_note(p));
253 }
254 }
255 println!();
256}
257
258fn format_status_note(p: &PartitionResult) -> String {
259 let s = match (p.source_count, p.exported_count) {
260 (Some(s), Some(e)) => format!("source={s}, exported={e}"),
261 (Some(s), None) => format!("source={s}, exported=n/a"),
262 (None, Some(e)) => format!("source=n/a, exported={e}"),
263 (None, None) => "no counts".to_string(),
264 };
265 if p.note.is_empty() {
266 s
267 } else {
268 format!("{s} ({})", p.note)
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use crate::config::{
276 CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
277 SourceType,
278 };
279 use crate::plan::{ChunkedPlan, ExtractionStrategy};
280 use crate::state::ChunkTaskInfo;
281 use crate::tuning::SourceTuning;
282
283 fn chunked_plan() -> ResolvedRunPlan {
284 ResolvedRunPlan {
285 export_name: "orders".into(),
286 base_query: "SELECT * FROM orders".into(),
287 strategy: ExtractionStrategy::Chunked(ChunkedPlan {
288 column: "id".into(),
289 chunk_size: 100,
290 chunk_count: None,
291 parallel: 1,
292 dense: false,
293 by_days: None,
294 checkpoint: true,
295 max_attempts: 3,
296 }),
297 format: FormatType::Parquet,
298 compression: CompressionType::Zstd,
299 compression_level: None,
300 max_file_size_bytes: None,
301 skip_empty: false,
302 meta_columns: MetaColumns::default(),
303 destination: DestinationConfig {
304 destination_type: DestinationType::Local,
305 path: Some("./out".into()),
306 ..Default::default()
307 },
308 quality: None,
309 tuning: SourceTuning::from_config(None),
310 tuning_profile_label: "balanced (default)".into(),
311 validate: false,
312 reconcile: false,
313 resume: false,
314 source: SourceConfig {
315 source_type: SourceType::Postgres,
316 url: Some("postgresql://localhost/test".into()),
317 url_env: None,
318 url_file: None,
319 host: None,
320 port: None,
321 user: None,
322 password: None,
323 password_env: None,
324 database: None,
325 environment: None,
326 tuning: None,
327 tls: None,
328 },
329 column_overrides: Default::default(),
330 schema_drift_policy: Default::default(),
331 shape_drift_warn_factor: 2.0,
332 parquet: None,
333 }
334 }
335
336 fn task(idx: i64, start: &str, end: &str, status: &str, rows: Option<i64>) -> ChunkTaskInfo {
337 ChunkTaskInfo {
338 chunk_index: idx,
339 start_key: start.into(),
340 end_key: end.into(),
341 status: status.into(),
342 attempts: 1,
343 last_error: None,
344 rows_written: rows,
345 file_name: None,
346 }
347 }
348
349 #[test]
350 fn matches_and_mismatches_are_classified() {
351 let plan = chunked_plan();
352 let tasks = vec![
353 task(0, "1", "100", "completed", Some(42)),
354 task(1, "101", "200", "completed", Some(30)),
355 ];
356 let mut n = 0;
358 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
359 n += 1;
360 Ok(Some(if n == 1 { 42 } else { 33 }))
361 })
362 .unwrap();
363
364 assert_eq!(parts.len(), 2);
365 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Match);
366 assert_eq!(parts[1].status, crate::plan::PartitionStatus::Mismatch);
367 assert_eq!(parts[1].source_count, Some(33));
368 assert_eq!(parts[1].exported_count, Some(30));
369 }
370
371 #[test]
372 fn unfinished_task_is_unknown_and_does_not_hide_source_count() {
373 let plan = chunked_plan();
374 let tasks = vec![task(0, "1", "100", "failed", None)];
375 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| Ok(Some(42))).unwrap();
376 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
377 assert_eq!(parts[0].source_count, Some(42));
378 assert_eq!(parts[0].exported_count, None);
379 }
380
381 #[test]
382 fn unparseable_chunk_keys_are_unknown_without_source_lookup() {
383 let plan = chunked_plan();
384 let tasks = vec![task(0, "alpha", "omega", "completed", Some(5))];
385 let mut called = false;
386 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
387 called = true;
388 Ok(Some(99))
389 })
390 .unwrap();
391 assert!(
392 !called,
393 "reconcile must skip source count for unparseable chunk keys"
394 );
395 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
396 assert_eq!(parts[0].exported_count, Some(5));
397 }
398
399 #[test]
400 fn chunk_query_passes_through_chunked_math() {
401 let plan = chunked_plan();
402 let tasks = vec![task(0, "10", "20", "completed", Some(5))];
403 let mut captured = String::new();
404 reconcile_chunked_tasks(&plan, &tasks, |q| {
405 captured = q.to_string();
406 Ok(Some(5))
407 })
408 .unwrap();
409 assert!(captured.contains("BETWEEN 10 AND 20"), "got: {captured}");
411 assert!(
412 captured.contains("\"id\""),
413 "identifier must be quoted: {captured}"
414 );
415 }
416}