1use std::collections::HashMap;
13use std::path::Path;
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::plan::{
18 ExtractionStrategy, PartitionKind, PartitionResult, ReconcileReport, ResolvedRunPlan,
19 build_plan,
20};
21use crate::source;
22use crate::state::{ChunkTaskInfo, StateStore};
23
24use super::chunked::build_chunk_query_sql;
25
26pub enum ReconcileOutputFormat {
28 Pretty,
30 Json(Option<String>),
32}
33
34pub fn run_reconcile_command(
35 config_path: &str,
36 export_name: &str,
37 params: Option<&HashMap<String, String>>,
38 format: ReconcileOutputFormat,
39) -> Result<()> {
40 let config = Config::load_with_params(config_path, params)?;
41 let config_dir = Path::new(config_path)
42 .parent()
43 .unwrap_or_else(|| Path::new("."));
44
45 let export = config
46 .exports
47 .iter()
48 .find(|e| e.name == export_name)
49 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
50
51 let plan = build_plan(&config, export, config_dir, false, false, false, params)?;
52
53 let state_path = config_dir.join(".rivet_state.db");
54 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
55
56 let report = match &plan.strategy {
57 ExtractionStrategy::Chunked(_) => reconcile_chunked(&plan, &state, export)?,
58 ExtractionStrategy::TimeWindow { .. } => {
59 anyhow::bail!(
60 "reconcile: time-window mode is not supported in v1 (Epic F). \
61 Convert to chunked mode with `chunk_by_days` for partition-level reconcile."
62 );
63 }
64 ExtractionStrategy::Snapshot | ExtractionStrategy::Incremental(_) => {
65 anyhow::bail!(
66 "reconcile: '{}' mode has no natural partitions — use `rivet run --reconcile` for a whole-export count check",
67 plan.strategy.mode_label()
68 );
69 }
70 };
71
72 emit_report(&report, &format)?;
73 Ok(())
74}
75
76pub(crate) fn reconcile_chunked_fresh(
80 plan: &ResolvedRunPlan,
81 state: &StateStore,
82) -> Result<ReconcileReport> {
83 reconcile_chunked_inner(plan, state)
84}
85
86fn reconcile_chunked(
87 plan: &ResolvedRunPlan,
88 state: &StateStore,
89 _export: &ExportConfig,
90) -> Result<ReconcileReport> {
91 reconcile_chunked_inner(plan, state)
92}
93
94fn reconcile_chunked_inner(plan: &ResolvedRunPlan, state: &StateStore) -> Result<ReconcileReport> {
95 let (run_id, _plan_hash, _status, _updated) = state
96 .get_latest_chunk_run(&plan.export_name)?
97 .ok_or_else(|| {
98 anyhow::anyhow!(
99 "reconcile: no chunk run recorded for export '{}'. \
100 Enable `chunk_checkpoint: true` and run the export first.",
101 plan.export_name
102 )
103 })?;
104
105 let tasks = state.list_chunk_tasks_for_run(&run_id)?;
106 if tasks.is_empty() {
107 anyhow::bail!(
108 "reconcile: chunk run '{}' for export '{}' has no tasks",
109 run_id,
110 plan.export_name
111 );
112 }
113
114 let mut src = source::create_source(&plan.source)?;
115 let partitions = reconcile_chunked_tasks(plan, &tasks, |chunk_query| {
116 let count_sql = format!("SELECT COUNT(*) FROM ({chunk_query}) AS _rc");
117 let raw = src.query_scalar(&count_sql)?;
118 Ok(raw.and_then(|s| s.trim().parse::<i64>().ok()))
119 })?;
120
121 let report = ReconcileReport::new(
122 plan.export_name.clone(),
123 run_id.clone(),
124 plan.strategy.mode_label().to_string(),
125 partitions,
126 );
127
128 if report.summary.mismatches == 0 && report.summary.unknown == 0 {
130 let highest = tasks
131 .iter()
132 .filter(|t| t.status == "completed")
133 .map(|t| t.chunk_index)
134 .max();
135 if let Some(idx) = highest
136 && let Err(e) = state.record_verified_chunked(&plan.export_name, idx, &run_id)
137 {
138 log::warn!(
139 "export '{}': verified boundary update failed: {:#}",
140 plan.export_name,
141 e
142 );
143 }
144 }
145
146 Ok(report)
147}
148
149pub(crate) fn reconcile_chunked_tasks<F>(
154 plan: &ResolvedRunPlan,
155 tasks: &[ChunkTaskInfo],
156 mut count_source: F,
157) -> Result<Vec<PartitionResult>>
158where
159 F: FnMut(&str) -> Result<Option<i64>>,
160{
161 let cp = match &plan.strategy {
162 ExtractionStrategy::Chunked(cp) => cp,
163 _ => anyhow::bail!("reconcile_chunked_tasks requires Chunked strategy"),
164 };
165
166 let mut out: Vec<PartitionResult> = Vec::with_capacity(tasks.len());
167 for t in tasks {
168 let exported = if t.status == "completed" {
169 t.rows_written
170 } else {
171 None
172 };
173
174 let (start, end) = match (t.start_key.parse::<i64>(), t.end_key.parse::<i64>()) {
175 (Ok(s), Ok(e)) => (s, e),
176 _ => {
177 out.push(PartitionResult::classify(
179 PartitionKind::Chunk,
180 format!("chunk {} [{}..{}]", t.chunk_index, t.start_key, t.end_key),
181 None,
182 exported,
183 ));
184 continue;
185 }
186 };
187
188 let chunk_query = build_chunk_query_sql(
189 &plan.base_query,
190 &cp.column,
191 start,
192 end,
193 cp.dense,
194 cp.by_days.is_some(),
195 plan.source.source_type,
196 );
197 let source_count = count_source(&chunk_query)?;
198
199 out.push(PartitionResult::classify(
200 PartitionKind::Chunk,
201 format!("chunk {} [{}..{}]", t.chunk_index, start, end),
202 source_count,
203 exported,
204 ));
205 }
206 Ok(out)
207}
208
209fn emit_report(report: &ReconcileReport, format: &ReconcileOutputFormat) -> Result<()> {
210 match format {
211 ReconcileOutputFormat::Pretty => {
212 print_report_pretty(report);
213 }
214 ReconcileOutputFormat::Json(None) => {
215 println!("{}", report.to_json_pretty()?);
216 }
217 ReconcileOutputFormat::Json(Some(path)) => {
218 let json = report.to_json_pretty()?;
219 std::fs::write(path, &json)
220 .map_err(|e| anyhow::anyhow!("cannot write reconcile report '{}': {}", path, e))?;
221 println!("Reconcile report written to: {}", path);
222 }
223 }
224 Ok(())
225}
226
227fn print_report_pretty(report: &ReconcileReport) {
228 println!();
229 println!(" Export : {}", report.export_name);
230 println!(" Run : {}", report.run_id);
231 println!(" Strategy : {}", report.strategy);
232 println!(
233 " Partitions: {} ({} match, {} mismatch, {} unknown)",
234 report.summary.total_partitions,
235 report.summary.matches,
236 report.summary.mismatches,
237 report.summary.unknown,
238 );
239 println!(
240 " Rows : source {} / exported {}",
241 report.summary.total_source_rows, report.summary.total_exported_rows,
242 );
243
244 let repair = report.repair_candidates();
245 if repair.is_empty() {
246 println!(" Status : all partitions match");
247 } else {
248 println!(" Repair candidates:");
249 for p in repair {
250 println!(" • {} — {}", p.identifier, format_status_note(p));
251 }
252 }
253 println!();
254}
255
256fn format_status_note(p: &PartitionResult) -> String {
257 let s = match (p.source_count, p.exported_count) {
258 (Some(s), Some(e)) => format!("source={s}, exported={e}"),
259 (Some(s), None) => format!("source={s}, exported=n/a"),
260 (None, Some(e)) => format!("source=n/a, exported={e}"),
261 (None, None) => "no counts".to_string(),
262 };
263 if p.note.is_empty() {
264 s
265 } else {
266 format!("{s} ({})", p.note)
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::config::{
274 CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
275 SourceType,
276 };
277 use crate::plan::{ChunkedPlan, ExtractionStrategy};
278 use crate::state::ChunkTaskInfo;
279 use crate::tuning::SourceTuning;
280
281 fn chunked_plan() -> ResolvedRunPlan {
282 ResolvedRunPlan {
283 export_name: "orders".into(),
284 base_query: "SELECT * FROM orders".into(),
285 strategy: ExtractionStrategy::Chunked(ChunkedPlan {
286 column: "id".into(),
287 chunk_size: 100,
288 chunk_count: None,
289 parallel: 1,
290 dense: false,
291 by_days: None,
292 checkpoint: true,
293 max_attempts: 3,
294 }),
295 format: FormatType::Parquet,
296 compression: CompressionType::Zstd,
297 compression_level: None,
298 max_file_size_bytes: None,
299 skip_empty: false,
300 meta_columns: MetaColumns::default(),
301 destination: DestinationConfig {
302 destination_type: DestinationType::Local,
303 path: Some("./out".into()),
304 ..Default::default()
305 },
306 quality: None,
307 tuning: SourceTuning::from_config(None),
308 tuning_profile_label: "balanced (default)".into(),
309 validate: false,
310 reconcile: false,
311 resume: false,
312 source: SourceConfig {
313 source_type: SourceType::Postgres,
314 url: Some("postgresql://localhost/test".into()),
315 url_env: None,
316 url_file: None,
317 host: None,
318 port: None,
319 user: None,
320 password: None,
321 password_env: None,
322 database: None,
323 environment: None,
324 tuning: None,
325 tls: None,
326 },
327 column_overrides: Default::default(),
328 schema_drift_policy: Default::default(),
329 shape_drift_warn_factor: 2.0,
330 parquet: None,
331 }
332 }
333
334 fn task(idx: i64, start: &str, end: &str, status: &str, rows: Option<i64>) -> ChunkTaskInfo {
335 ChunkTaskInfo {
336 chunk_index: idx,
337 start_key: start.into(),
338 end_key: end.into(),
339 status: status.into(),
340 attempts: 1,
341 last_error: None,
342 rows_written: rows,
343 file_name: None,
344 }
345 }
346
347 #[test]
348 fn matches_and_mismatches_are_classified() {
349 let plan = chunked_plan();
350 let tasks = vec![
351 task(0, "1", "100", "completed", Some(42)),
352 task(1, "101", "200", "completed", Some(30)),
353 ];
354 let mut n = 0;
356 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
357 n += 1;
358 Ok(Some(if n == 1 { 42 } else { 33 }))
359 })
360 .unwrap();
361
362 assert_eq!(parts.len(), 2);
363 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Match);
364 assert_eq!(parts[1].status, crate::plan::PartitionStatus::Mismatch);
365 assert_eq!(parts[1].source_count, Some(33));
366 assert_eq!(parts[1].exported_count, Some(30));
367 }
368
369 #[test]
370 fn unfinished_task_is_unknown_and_does_not_hide_source_count() {
371 let plan = chunked_plan();
372 let tasks = vec![task(0, "1", "100", "failed", None)];
373 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| Ok(Some(42))).unwrap();
374 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
375 assert_eq!(parts[0].source_count, Some(42));
376 assert_eq!(parts[0].exported_count, None);
377 }
378
379 #[test]
380 fn unparseable_chunk_keys_are_unknown_without_source_lookup() {
381 let plan = chunked_plan();
382 let tasks = vec![task(0, "alpha", "omega", "completed", Some(5))];
383 let mut called = false;
384 let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
385 called = true;
386 Ok(Some(99))
387 })
388 .unwrap();
389 assert!(
390 !called,
391 "reconcile must skip source count for unparseable chunk keys"
392 );
393 assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
394 assert_eq!(parts[0].exported_count, Some(5));
395 }
396
397 #[test]
398 fn chunk_query_passes_through_chunked_math() {
399 let plan = chunked_plan();
400 let tasks = vec![task(0, "10", "20", "completed", Some(5))];
401 let mut captured = String::new();
402 reconcile_chunked_tasks(&plan, &tasks, |q| {
403 captured = q.to_string();
404 Ok(Some(5))
405 })
406 .unwrap();
407 assert!(captured.contains("BETWEEN 10 AND 20"), "got: {captured}");
409 assert!(
410 captured.contains("\"id\""),
411 "identifier must be quoted: {captured}"
412 );
413 }
414}