1use std::path::Path;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Duration;
4
5use super::RunSummary;
6use super::retry::classify_error;
7use super::sink::ExportSink;
8use super::validate::validate_output;
9use crate::config::{ExportConfig, SourceConfig};
10use crate::error::Result;
11use crate::preflight::chunk_sparsity_from_counts;
12use crate::source::{self, Source};
13use crate::state::StateStore;
14use crate::tuning::SourceTuning;
15use crate::{destination, format, resource};
16
17pub fn generate_chunks(min: i64, max: i64, chunk_size: i64) -> Vec<(i64, i64)> {
20 if max < min || chunk_size <= 0 {
21 return vec![];
22 }
23 let mut chunks = Vec::new();
24 let mut start = min;
25 while start <= max {
26 let end = (start + chunk_size - 1).min(max);
27 chunks.push((start, end));
28 start = end + 1;
29 }
30 chunks
31}
32
33pub(crate) const RIVET_CHUNK_RN_COL: &str = "_rivet_chunk_rn";
35
36pub(crate) fn build_chunk_query_sql(
37 base_query: &str,
38 order_column: &str,
39 start: i64,
40 end: i64,
41 chunk_dense: bool,
42) -> String {
43 if chunk_dense {
44 format!(
45 "SELECT * FROM (SELECT _rivet_i.*, ROW_NUMBER() OVER (ORDER BY _rivet_i.{oc}) AS {rn} FROM ({bq}) AS _rivet_i) AS _rivet_w WHERE _rivet_w.{rn} BETWEEN {s} AND {e}",
46 bq = base_query,
47 oc = order_column,
48 rn = RIVET_CHUNK_RN_COL,
49 s = start,
50 e = end,
51 )
52 } else {
53 format!(
54 "SELECT * FROM ({base}) AS _rivet WHERE {col} BETWEEN {start} AND {end}",
55 base = base_query,
56 col = order_column,
57 start = start,
58 end = end,
59 )
60 }
61}
62
63pub(crate) fn chunk_plan_fingerprint(
64 base_query: &str,
65 chunk_column: &str,
66 chunk_size: usize,
67 chunk_dense: bool,
68) -> String {
69 use xxhash_rust::xxh3::xxh3_64;
70 let mut buf = String::with_capacity(base_query.len() + chunk_column.len() + 32);
71 buf.push_str(base_query);
72 buf.push('\x1f');
73 buf.push_str(chunk_column);
74 buf.push('\x1f');
75 buf.push_str(&chunk_size.to_string());
76 buf.push('\x1f');
77 buf.push_str(if chunk_dense { "dense_rn" } else { "range" });
78 format!("{:016x}", xxh3_64(buf.as_bytes()))
79}
80
81fn parse_scalar_i64(raw: &str) -> Result<i64> {
84 let t = raw.trim();
85 t.parse::<i64>()
86 .or_else(|_| t.parse::<f64>().map(|x| x as i64))
87 .map_err(|_| anyhow::anyhow!("invalid numeric scalar: {:?}", t))
88}
89
90fn query_wrapped_row_count(src: &mut dyn Source, base_query: &str) -> Result<i64> {
91 let sql = format!("SELECT COUNT(*) FROM ({}) AS _rivet_rowcnt", base_query);
92 let raw = src
93 .query_scalar(&sql)?
94 .ok_or_else(|| anyhow::anyhow!("COUNT(*) returned no row"))?;
95 parse_scalar_i64(&raw)
96}
97
98fn log_chunk_boundaries_list(export_name: &str, chunks: &[(i64, i64)]) {
99 const HEAD: usize = 8;
100 const TAIL: usize = 8;
101 if chunks.is_empty() {
102 log::info!(
103 "export '{}': no BETWEEN windows (empty key range)",
104 export_name
105 );
106 return;
107 }
108 if chunks.len() <= HEAD + TAIL {
109 for (i, (a, b)) in chunks.iter().enumerate() {
110 log::info!("export '{}': [{:>5}] {} .. {}", export_name, i, a, b);
111 }
112 } else {
113 for (i, (a, b)) in chunks.iter().enumerate().take(HEAD) {
114 log::info!("export '{}': [{:>5}] {} .. {}", export_name, i, a, b);
115 }
116 log::info!(
117 "export '{}': ... {} windows omitted ...",
118 export_name,
119 chunks.len() - HEAD - TAIL
120 );
121 for (i, (a, b)) in chunks.iter().enumerate().skip(chunks.len() - TAIL) {
122 log::info!("export '{}': [{:>5}] {} .. {}", export_name, i, a, b);
123 }
124 }
125}
126
127fn log_chunk_sparsity_at_run(
128 export_name: &str,
129 chunk_column: &str,
130 chunk_size: usize,
131 min_val: i64,
132 max_val: i64,
133 chunks: &[(i64, i64)],
134 row_count: i64,
135) {
136 if row_count == 0 {
137 log::info!(
138 "export '{}': COUNT(*) = 0 — no rows in export query; {} BETWEEN window(s) from `{}` min..max (runs will skip empty chunks)",
139 export_name,
140 chunks.len(),
141 chunk_column
142 );
143 if !chunks.is_empty() && chunks.len() <= 24 {
144 log_chunk_boundaries_list(export_name, chunks);
145 }
146 return;
147 }
148
149 let info = chunk_sparsity_from_counts(row_count, min_val, max_val, chunk_size);
150 if info.is_sparse {
151 let fill_pct = info.density * 100.0;
152 let empty_hint = (1.0 - info.density).clamp(0.0, 1.0) * 100.0;
153 log::info!(
154 "export '{}': sparse `{}` range — ~{:.2}% of the min..max ID band contains rows (~{:.1}% of logical windows likely empty). \
155 rows={}, span≈{}, chunk_size={}, ~{} logical windows, {} BETWEEN chunks. Computed boundaries:",
156 export_name,
157 chunk_column,
158 fill_pct,
159 empty_hint,
160 info.row_count,
161 info.range_span,
162 chunk_size,
163 info.logical_windows,
164 chunks.len(),
165 );
166 log_chunk_boundaries_list(export_name, chunks);
167 } else {
168 log::info!(
169 "export '{}': `{}` range looks dense enough for BETWEEN chunking (rows={}, span≈{}, density≈{:.6}, {} chunks); continuing",
170 export_name,
171 chunk_column,
172 info.row_count,
173 info.range_span,
174 info.density,
175 chunks.len(),
176 );
177 }
178}
179
180pub(crate) fn detect_and_generate_chunks(
181 src: &mut dyn Source,
182 base_query: &str,
183 chunk_column: &str,
184 chunk_size: usize,
185 export_name: &str,
186 chunk_dense: bool,
187) -> Result<Vec<(i64, i64)>> {
188 if chunk_dense {
189 let row_count = query_wrapped_row_count(src, base_query)?;
190 log::info!(
191 "export '{}': chunk_dense: ROW_NUMBER() OVER (ORDER BY `{}`) — {} row(s), chunk_size={}",
192 export_name,
193 chunk_column,
194 row_count,
195 chunk_size
196 );
197 let chunks = if row_count <= 0 {
198 vec![]
199 } else {
200 generate_chunks(1, row_count, chunk_size as i64)
201 };
202 log::info!(
203 "export '{}': dense chunk plan: {} window(s) on ordinal 1..{}",
204 export_name,
205 chunks.len(),
206 row_count
207 );
208 return Ok(chunks);
209 }
210
211 let min_sql = format!(
212 "SELECT min({col}) FROM ({q}) AS _rivet",
213 col = chunk_column,
214 q = base_query,
215 );
216 let max_sql = format!(
217 "SELECT max({col}) FROM ({q}) AS _rivet",
218 col = chunk_column,
219 q = base_query,
220 );
221
222 let min_val = src
223 .query_scalar(&min_sql)?
224 .and_then(|s| s.trim().parse::<i64>().ok())
225 .unwrap_or(0);
226 let max_val = src
227 .query_scalar(&max_sql)?
228 .and_then(|s| s.trim().parse::<i64>().ok())
229 .unwrap_or(0);
230
231 log::info!(
232 "export '{}': chunk_column `{}` range {} .. {} (chunk_size={})",
233 export_name,
234 chunk_column,
235 min_val,
236 max_val,
237 chunk_size
238 );
239
240 let chunks = generate_chunks(min_val, max_val, chunk_size as i64);
241
242 match query_wrapped_row_count(src, base_query) {
243 Ok(row_count) => {
244 log_chunk_sparsity_at_run(
245 export_name,
246 chunk_column,
247 chunk_size,
248 min_val,
249 max_val,
250 &chunks,
251 row_count,
252 );
253 }
254 Err(e) => {
255 log::warn!(
256 "export '{}': could not run COUNT(*) for sparsity diagnostics: {:#}; proceeding with {} windows from min/max only",
257 export_name,
258 e,
259 chunks.len()
260 );
261 if chunks.len() <= 24 {
262 log_chunk_boundaries_list(export_name, &chunks);
263 } else {
264 log::info!(
265 "export '{}': use `RUST_LOG=info rivet run` after fixing COUNT if you need the full boundary list",
266 export_name
267 );
268 }
269 }
270 }
271
272 Ok(chunks)
273}
274
275pub(crate) fn run_chunked_sequential(
278 src: &mut dyn Source,
279 base_query: &str,
280 export: &ExportConfig,
281 tuning: &SourceTuning,
282 validate: bool,
283 summary: &mut RunSummary,
284 state: Option<&StateStore>,
285) -> Result<()> {
286 let col = export
287 .chunk_column
288 .as_deref()
289 .expect("chunk_column required for chunked mode");
290 let chunks = detect_and_generate_chunks(
291 src,
292 base_query,
293 col,
294 export.chunk_size,
295 &export.name,
296 export.chunk_dense,
297 )?;
298
299 log::info!(
300 "export '{}': {} chunks to process sequentially",
301 export.name,
302 chunks.len()
303 );
304
305 for (i, (start, end)) in chunks.iter().enumerate() {
306 if !resource::check_memory(tuning.memory_threshold_mb) {
307 log::warn!("memory threshold exceeded, pausing 5s before chunk {}", i);
308 std::thread::sleep(Duration::from_secs(5));
309 }
310
311 let chunk_query = build_chunk_query_sql(base_query, col, *start, *end, export.chunk_dense);
312 log::info!(
313 "export '{}': chunk {}/{} ({}..{})",
314 export.name,
315 i + 1,
316 chunks.len(),
317 start,
318 end
319 );
320
321 let mut sink = ExportSink::new(export)?;
322 src.export(&chunk_query, None, None, tuning, &mut sink)?;
323 if let Some(w) = sink.writer.take() {
324 w.finish()?;
325 }
326
327 summary.total_rows += sink.total_rows as i64;
328 log::info!(
329 "export '{}': chunk {} -- {} rows",
330 export.name,
331 i + 1,
332 sink.total_rows
333 );
334
335 if sink.total_rows > 0 {
336 if validate {
337 validate_output(sink.tmp.path(), export.format, sink.total_rows)?;
338 summary.validated = Some(true);
339 }
340 let file_bytes = std::fs::metadata(sink.tmp.path())
341 .map(|m| m.len())
342 .unwrap_or(0);
343 summary.bytes_written += file_bytes;
344 summary.files_produced += 1;
345
346 let fmt =
347 format::create_format(export.format, export.compression, export.compression_level);
348 let file_name = format!(
349 "{}_{}_chunk{}.{}",
350 export.name,
351 chrono::Utc::now().format("%Y%m%d_%H%M%S"),
352 i,
353 fmt.file_extension()
354 );
355 let dest = destination::create_destination(&export.destination)?;
356 dest.write(sink.tmp.path(), &file_name)?;
357
358 if let Some(st) = state {
359 let _ = st.record_file(
360 &summary.run_id,
361 &export.name,
362 &file_name,
363 sink.total_rows as i64,
364 file_bytes as i64,
365 &format!("{:?}", export.format).to_lowercase(),
366 Some(&format!("{:?}", export.compression).to_lowercase()),
367 );
368 }
369 }
370 }
371
372 log::info!("export '{}': all chunks completed", export.name);
373 Ok(())
374}
375
376#[allow(clippy::too_many_arguments)]
379pub(super) fn run_chunked_parallel(
380 source_config: &SourceConfig,
381 state: &StateStore,
382 export: &ExportConfig,
383 tuning: &SourceTuning,
384 config_dir: &Path,
385 validate: bool,
386 summary: &mut RunSummary,
387 params: Option<&std::collections::HashMap<String, String>>,
388) -> Result<()> {
389 let base_query = export.resolve_query(config_dir, params)?;
390 let col = export
391 .chunk_column
392 .as_deref()
393 .expect("chunk_column required for chunked mode");
394
395 let mut src = source::create_source(source_config)?;
396 let chunks = detect_and_generate_chunks(
397 &mut *src,
398 &base_query,
399 col,
400 export.chunk_size,
401 &export.name,
402 export.chunk_dense,
403 )?;
404 drop(src);
405
406 let total_chunks = chunks.len();
407 let parallel = export.parallel.min(total_chunks);
408 log::info!(
409 "export '{}': {} chunks, {} parallel threads",
410 export.name,
411 total_chunks,
412 parallel
413 );
414
415 let completed = AtomicUsize::new(0);
416 let agg_rows = std::sync::atomic::AtomicI64::new(0);
417 let agg_bytes = std::sync::atomic::AtomicU64::new(0);
418 let agg_files = AtomicUsize::new(0);
419 let errors = std::sync::Mutex::new(Vec::<String>::new());
420 let file_records: std::sync::Mutex<Vec<(String, i64, i64)>> = std::sync::Mutex::new(Vec::new());
421 let semaphore = AtomicUsize::new(0);
422
423 std::thread::scope(|s| {
424 for (i, (start, end)) in chunks.iter().enumerate() {
425 while semaphore.load(Ordering::Relaxed) >= parallel {
426 std::thread::sleep(Duration::from_millis(50));
427 }
428
429 if !resource::check_memory(tuning.memory_threshold_mb) {
430 log::warn!("memory threshold exceeded, waiting before chunk {}", i);
431 while !resource::check_memory(tuning.memory_threshold_mb) {
432 std::thread::sleep(Duration::from_secs(2));
433 }
434 }
435
436 semaphore.fetch_add(1, Ordering::Relaxed);
437
438 let source_config = source_config.clone();
439 let tuning = tuning.clone();
440 let export_for_sink = export.clone();
441 let export_name = &export.name;
442 let format_type = export.format;
443 let dest_config = &export.destination;
444 let base_query = &base_query;
445 let completed = &completed;
446 let agg_rows = &agg_rows;
447 let agg_bytes = &agg_bytes;
448 let agg_files = &agg_files;
449 let errors = &errors;
450 let file_records = &file_records;
451 let semaphore = &semaphore;
452 let start = *start;
453 let end = *end;
454
455 s.spawn(move || {
456 let result = (|| -> Result<()> {
457 let chunk_query = build_chunk_query_sql(
458 base_query,
459 col,
460 start,
461 end,
462 export_for_sink.chunk_dense,
463 );
464
465 let mut thread_src = source::create_source(&source_config)?;
466 let mut sink = ExportSink::new(&export_for_sink)?;
467 thread_src.export(&chunk_query, None, None, &tuning, &mut sink)?;
468 if let Some(w) = sink.writer.take() {
469 w.finish()?;
470 }
471
472 agg_rows.fetch_add(sink.total_rows as i64, Ordering::Relaxed);
473
474 if sink.total_rows > 0 {
475 if validate {
476 validate_output(sink.tmp.path(), format_type, sink.total_rows)?;
477 }
478 let file_bytes = std::fs::metadata(sink.tmp.path())
479 .map(|m| m.len())
480 .unwrap_or(0);
481 agg_bytes.fetch_add(file_bytes, Ordering::Relaxed);
482 agg_files.fetch_add(1, Ordering::Relaxed);
483
484 let fmt = format::create_format(
485 format_type,
486 export_for_sink.compression,
487 export_for_sink.compression_level,
488 );
489 let file_name = format!(
490 "{}_{}_chunk{}.{}",
491 export_name,
492 chrono::Utc::now().format("%Y%m%d_%H%M%S"),
493 i,
494 fmt.file_extension()
495 );
496 let dest = destination::create_destination(dest_config)?;
497 dest.write(sink.tmp.path(), &file_name)?;
498 file_records
499 .lock()
500 .unwrap_or_else(|e| e.into_inner())
501 .push((file_name, sink.total_rows as i64, file_bytes as i64));
502 }
503
504 let done = completed.fetch_add(1, Ordering::Relaxed) + 1;
505 log::info!(
506 "export '{}': chunk {}/{} done ({} rows)",
507 export_name,
508 done,
509 total_chunks,
510 sink.total_rows
511 );
512 Ok(())
513 })();
514
515 semaphore.fetch_sub(1, Ordering::Relaxed);
516
517 if let Err(e) = result {
518 log::error!("export '{}': chunk {} failed: {:#}", export_name, i, e);
519 errors
520 .lock()
521 .unwrap_or_else(|e| e.into_inner())
522 .push(format!("chunk {}: {:#}", i, e));
523 }
524 });
525 }
526 });
527
528 summary.total_rows = agg_rows.load(Ordering::Relaxed);
529 summary.bytes_written = agg_bytes.load(Ordering::Relaxed);
530 summary.files_produced = agg_files.load(Ordering::Relaxed);
531 if validate {
532 summary.validated = Some(true);
533 }
534
535 let fmt_name = format!("{:?}", export.format).to_lowercase();
536 let comp_name = format!("{:?}", export.compression).to_lowercase();
537 for (fname, rows, bytes) in file_records.into_inner().unwrap_or_else(|e| e.into_inner()) {
538 let _ = state.record_file(
539 &summary.run_id,
540 &export.name,
541 &fname,
542 rows,
543 bytes,
544 &fmt_name,
545 Some(&comp_name),
546 );
547 }
548
549 let errs = errors.into_inner().unwrap_or_else(|e| e.into_inner());
550 if !errs.is_empty() {
551 anyhow::bail!(
552 "export '{}': {} chunks failed:\n{}",
553 export.name,
554 errs.len(),
555 errs.join("\n")
556 );
557 }
558
559 log::info!(
560 "export '{}': all {} chunks completed",
561 export.name,
562 total_chunks
563 );
564 Ok(())
565}
566
567fn chunk_max_attempts_for_export(export: &ExportConfig, tuning: &SourceTuning) -> u32 {
570 export
571 .chunk_max_attempts
572 .unwrap_or_else(|| tuning.max_retries.saturating_add(1).max(1))
573}
574
575#[allow(clippy::too_many_arguments)]
576fn ensure_chunk_checkpoint_plan(
577 state: &StateStore,
578 export: &ExportConfig,
579 summary: &mut RunSummary,
580 base_query: &str,
581 col: &str,
582 chunks: &[(i64, i64)],
583 resume: bool,
584 tuning: &SourceTuning,
585) -> Result<String> {
586 let plan_hash = chunk_plan_fingerprint(base_query, col, export.chunk_size, export.chunk_dense);
587 let max_att = chunk_max_attempts_for_export(export, tuning);
588
589 if resume {
590 let Some((rid, stored_hash)) = state.find_in_progress_chunk_run(&export.name)? else {
591 anyhow::bail!(
592 "export '{}': --resume but no in-progress chunk checkpoint; run without --resume first or `rivet state reset-chunks --export {}`",
593 export.name,
594 export.name
595 );
596 };
597 if stored_hash != plan_hash {
598 anyhow::bail!(
599 "export '{}': chunk plan fingerprint mismatch (query, chunk_column, chunk_size, or chunk_dense changed); cannot resume",
600 export.name
601 );
602 }
603 summary.run_id = rid.clone();
604 let n = state.reset_stale_running_chunk_tasks(&rid)?;
605 if n > 0 {
606 log::warn!(
607 "export '{}': reset {} stale 'running' chunk task(s) after resume",
608 export.name,
609 n
610 );
611 }
612 return Ok(rid);
613 }
614
615 if let Some((rid, _)) = state.find_in_progress_chunk_run(&export.name)? {
616 anyhow::bail!(
617 "export '{}': chunk checkpoint run '{}' still in progress; use `rivet run --resume` or `rivet state reset-chunks --export {}`",
618 export.name,
619 rid,
620 export.name
621 );
622 }
623
624 state.create_chunk_run(&summary.run_id, &export.name, &plan_hash, max_att)?;
625 state.insert_chunk_tasks(&summary.run_id, chunks)?;
626 log::info!(
627 "export '{}': chunk checkpoint: {} tasks saved (run_id={})",
628 export.name,
629 chunks.len(),
630 summary.run_id
631 );
632 Ok(summary.run_id.clone())
633}
634
635#[allow(clippy::too_many_arguments)]
636fn export_one_chunk_range(
637 src: &mut dyn Source,
638 base_query: &str,
639 col: &str,
640 start: i64,
641 end: i64,
642 chunk_index: i64,
643 export: &ExportConfig,
644 tuning: &SourceTuning,
645 validate: bool,
646) -> Result<(usize, Option<String>, u64)> {
647 let chunk_query = build_chunk_query_sql(base_query, col, start, end, export.chunk_dense);
648
649 let mut sink = ExportSink::new(export)?;
650 src.export(&chunk_query, None, None, tuning, &mut sink)?;
651 if let Some(w) = sink.writer.take() {
652 w.finish()?;
653 }
654
655 if sink.total_rows == 0 {
656 return Ok((0, None, 0));
657 }
658
659 if validate {
660 validate_output(sink.tmp.path(), export.format, sink.total_rows)?;
661 }
662 let file_bytes = std::fs::metadata(sink.tmp.path())
663 .map(|m| m.len())
664 .unwrap_or(0);
665
666 let fmt = format::create_format(export.format, export.compression, export.compression_level);
667 let file_name = format!(
668 "{}_{}_chunk{}.{}",
669 export.name,
670 chrono::Utc::now().format("%Y%m%d_%H%M%S"),
671 chunk_index,
672 fmt.file_extension()
673 );
674 let dest = destination::create_destination(&export.destination)?;
675 dest.write(sink.tmp.path(), &file_name)?;
676
677 Ok((sink.total_rows, Some(file_name), file_bytes))
678}
679
680#[allow(clippy::too_many_arguments)]
681fn run_chunk_with_source_retries(
682 source_config: &SourceConfig,
683 base_query: &str,
684 col: &str,
685 start: i64,
686 end: i64,
687 chunk_index: i64,
688 export: &ExportConfig,
689 tuning: &SourceTuning,
690 validate: bool,
691) -> Result<(usize, Option<String>, u64)> {
692 let mut last_err: Option<anyhow::Error> = None;
693 for attempt in 0..=tuning.max_retries {
694 if attempt > 0 {
695 let (_, _needs_reconnect, extra_delay) = last_err
696 .as_ref()
697 .map(classify_error)
698 .unwrap_or((false, false, 0));
699 let backoff = tuning.retry_backoff_ms * 2u64.pow(attempt - 1) + extra_delay;
700 log::warn!(
701 "export '{}': chunk {} retry {}/{} in {}ms",
702 export.name,
703 chunk_index,
704 attempt,
705 tuning.max_retries,
706 backoff
707 );
708 std::thread::sleep(Duration::from_millis(backoff));
709 }
710
711 let mut src = match source::create_source(source_config) {
712 Ok(s) => s,
713 Err(e) => {
714 let (transient, _, _) = classify_error(&e);
715 if attempt < tuning.max_retries && transient {
716 last_err = Some(e);
717 continue;
718 }
719 return Err(e);
720 }
721 };
722
723 match export_one_chunk_range(
724 &mut *src,
725 base_query,
726 col,
727 start,
728 end,
729 chunk_index,
730 export,
731 tuning,
732 validate,
733 ) {
734 Ok(v) => return Ok(v),
735 Err(e) => {
736 let (transient, _, _) = classify_error(&e);
737 if attempt < tuning.max_retries && transient {
738 last_err = Some(e);
739 continue;
740 }
741 return Err(e);
742 }
743 }
744 }
745 Err(last_err.unwrap_or_else(|| anyhow::anyhow!("chunk export failed after retries")))
746}
747
748#[allow(clippy::too_many_arguments)]
749pub(crate) fn run_chunked_sequential_checkpoint(
750 src: &mut dyn Source,
751 source_config: &SourceConfig,
752 state: &StateStore,
753 base_query: &str,
754 export: &ExportConfig,
755 tuning: &SourceTuning,
756 validate: bool,
757 summary: &mut RunSummary,
758 st: Option<&StateStore>,
759 resume: bool,
760 config_path: &str,
761) -> Result<()> {
762 let _ = config_path;
763 let col = export
764 .chunk_column
765 .as_deref()
766 .expect("chunk_column required for chunked mode");
767
768 let chunks = if resume {
769 vec![]
770 } else {
771 detect_and_generate_chunks(
772 src,
773 base_query,
774 col,
775 export.chunk_size,
776 &export.name,
777 export.chunk_dense,
778 )?
779 };
780
781 let run_id = ensure_chunk_checkpoint_plan(
782 state, export, summary, base_query, col, &chunks, resume, tuning,
783 )?;
784
785 if !resume && !resource::check_memory(tuning.memory_threshold_mb) {
786 log::warn!("memory threshold exceeded before chunk export; pausing 5s");
787 std::thread::sleep(Duration::from_secs(5));
788 }
789
790 while let Some((chunk_index, sk, ek)) = state.claim_next_chunk_task(&run_id)? {
791 if !resource::check_memory(tuning.memory_threshold_mb) {
792 log::warn!(
793 "memory threshold exceeded, pausing 5s before chunk {}",
794 chunk_index
795 );
796 std::thread::sleep(Duration::from_secs(5));
797 }
798
799 let start: i64 = sk
800 .parse()
801 .map_err(|_| anyhow::anyhow!("chunk {}: invalid start_key {:?}", chunk_index, sk))?;
802 let end: i64 = ek
803 .parse()
804 .map_err(|_| anyhow::anyhow!("chunk {}: invalid end_key {:?}", chunk_index, ek))?;
805
806 log::info!(
807 "export '{}': checkpoint chunk {} ({}..{})",
808 export.name,
809 chunk_index,
810 start,
811 end
812 );
813
814 match run_chunk_with_source_retries(
815 source_config,
816 base_query,
817 col,
818 start,
819 end,
820 chunk_index,
821 export,
822 tuning,
823 validate,
824 ) {
825 Ok((rows, fname, file_bytes)) => {
826 summary.total_rows += rows as i64;
827 if rows > 0 {
828 summary.bytes_written += file_bytes;
829 summary.files_produced += 1;
830 if let Some(name) = &fname
831 && let Some(store) = st
832 {
833 let _ = store.record_file(
834 &summary.run_id,
835 &export.name,
836 name,
837 rows as i64,
838 file_bytes as i64,
839 &format!("{:?}", export.format).to_lowercase(),
840 Some(&format!("{:?}", export.compression).to_lowercase()),
841 );
842 }
843 }
844 state.complete_chunk_task(&run_id, chunk_index, rows as i64, fname.as_deref())?;
845 }
846 Err(e) => {
847 let msg = format!("{:#}", e);
848 log::error!(
849 "export '{}': chunk {} failed: {}",
850 export.name,
851 chunk_index,
852 msg
853 );
854 state.fail_chunk_task(&run_id, chunk_index, &msg)?;
855 }
856 }
857 }
858
859 let pending = state.count_chunk_tasks_not_completed(&run_id)?;
860 if pending > 0 {
861 anyhow::bail!(
862 "export '{}': chunk checkpoint incomplete ({} tasks not completed); fix errors and `rivet run --resume` or `rivet state reset-chunks`",
863 export.name,
864 pending
865 );
866 }
867
868 state.finalize_chunk_run_completed(&run_id)?;
869 log::info!(
870 "export '{}': chunk checkpoint run completed (run_id={})",
871 export.name,
872 run_id
873 );
874 Ok(())
875}
876
877#[allow(clippy::too_many_arguments)]
878pub(super) fn run_chunked_parallel_checkpoint(
879 config_path: &str,
880 source_config: &SourceConfig,
881 state: &StateStore,
882 export: &ExportConfig,
883 tuning: &SourceTuning,
884 config_dir: &Path,
885 validate: bool,
886 summary: &mut RunSummary,
887 params: Option<&std::collections::HashMap<String, String>>,
888 resume: bool,
889) -> Result<()> {
890 let base_query = export.resolve_query(config_dir, params)?;
891 let col = export
892 .chunk_column
893 .as_deref()
894 .expect("chunk_column required for chunked mode");
895
896 let chunks = if resume {
897 vec![]
898 } else {
899 let mut src = source::create_source(source_config)?;
900 detect_and_generate_chunks(
901 &mut *src,
902 &base_query,
903 col,
904 export.chunk_size,
905 &export.name,
906 export.chunk_dense,
907 )?
908 };
909
910 let run_id = ensure_chunk_checkpoint_plan(
911 state,
912 export,
913 summary,
914 &base_query,
915 col,
916 &chunks,
917 resume,
918 tuning,
919 )?;
920
921 let total_tasks = {
922 let tasks = state.list_chunk_tasks_for_run(&run_id)?;
923 tasks.len().max(1)
924 };
925 let parallel = export.parallel.min(total_tasks);
926 log::info!(
927 "export '{}': chunk checkpoint parallel: {} workers, run_id={}",
928 export.name,
929 parallel,
930 run_id
931 );
932
933 let db_path = state.database_path().to_path_buf();
934 let run_id_arc = std::sync::Arc::new(run_id.clone());
935 let agg_rows = std::sync::atomic::AtomicI64::new(0);
936 let agg_bytes = std::sync::atomic::AtomicU64::new(0);
937 let agg_files = std::sync::atomic::AtomicUsize::new(0);
938 let errors = std::sync::Mutex::new(Vec::<String>::new());
939
940 let export_name = export.name.clone();
941 let export_for_workers = export.clone();
942 let format_type = export.format;
943 let dest_config = export.destination.clone();
944 let tuning_cl = tuning.clone();
945 let source_config_cl = source_config.clone();
946 let base_query_cl = base_query.clone();
947 let col_owned = col.to_string();
948 let config_path_owned = config_path.to_string();
949 let fmt_label = format!("{:?}", export.format).to_lowercase();
950 let comp_label = format!("{:?}", export.compression).to_lowercase();
951
952 std::thread::scope(|s| {
953 for _ in 0..parallel {
954 let db_path = db_path.clone();
955 let run_id_arc = std::sync::Arc::clone(&run_id_arc);
956 let agg_rows = &agg_rows;
957 let agg_bytes = &agg_bytes;
958 let agg_files = &agg_files;
959 let errors = &errors;
960 let export_name = export_name.clone();
961 let export_worker = export_for_workers.clone();
962 let tuning_w = tuning_cl.clone();
963 let source_config_w = source_config_cl.clone();
964 let base_query_w = base_query_cl.clone();
965 let col_w = col_owned.clone();
966 let dest_config = dest_config.clone();
967 let config_path_w = config_path_owned.clone();
968 let fmt_label_w = fmt_label.clone();
969 let comp_label_w = comp_label.clone();
970
971 s.spawn(move || {
972 loop {
973 let claimed = match StateStore::claim_next_chunk_task_at_path(
974 &db_path,
975 run_id_arc.as_str(),
976 ) {
977 Ok(c) => c,
978 Err(e) => {
979 errors
980 .lock()
981 .unwrap_or_else(|e| e.into_inner())
982 .push(format!("claim error: {:#}", e));
983 break;
984 }
985 };
986 let Some((chunk_index, sk, ek)) = claimed else {
987 break;
988 };
989
990 if !resource::check_memory(tuning_w.memory_threshold_mb) {
991 log::warn!("memory threshold exceeded in worker; pausing 2s");
992 std::thread::sleep(Duration::from_secs(2));
993 }
994
995 let start: i64 = match sk.parse() {
996 Ok(v) => v,
997 Err(_) => {
998 let _ = StateStore::fail_chunk_task_at_path(
999 &db_path,
1000 run_id_arc.as_str(),
1001 chunk_index,
1002 "invalid start_key",
1003 );
1004 continue;
1005 }
1006 };
1007 let end: i64 = match ek.parse() {
1008 Ok(v) => v,
1009 Err(_) => {
1010 let _ = StateStore::fail_chunk_task_at_path(
1011 &db_path,
1012 run_id_arc.as_str(),
1013 chunk_index,
1014 "invalid end_key",
1015 );
1016 continue;
1017 }
1018 };
1019
1020 let chunk_query = build_chunk_query_sql(
1021 &base_query_w,
1022 &col_w,
1023 start,
1024 end,
1025 export_worker.chunk_dense,
1026 );
1027
1028 let result = (|| -> Result<(usize, Option<String>, u64)> {
1029 let mut last_err: Option<anyhow::Error> = None;
1030 for attempt in 0..=tuning_w.max_retries {
1031 if attempt > 0 {
1032 let (_, _, extra_delay) = last_err
1033 .as_ref()
1034 .map(classify_error)
1035 .unwrap_or((false, false, 0));
1036 let backoff =
1037 tuning_w.retry_backoff_ms * 2u64.pow(attempt - 1) + extra_delay;
1038 std::thread::sleep(Duration::from_millis(backoff));
1039 }
1040
1041 let mut thread_src = match source::create_source(&source_config_w) {
1042 Ok(s) => s,
1043 Err(e) => {
1044 let (transient, _, _) = classify_error(&e);
1045 if attempt < tuning_w.max_retries && transient {
1046 last_err = Some(e);
1047 continue;
1048 }
1049 return Err(e);
1050 }
1051 };
1052
1053 let mut sink = ExportSink::new(&export_worker)?;
1054
1055 let export_attempt = (|| -> Result<(usize, Option<String>, u64)> {
1056 thread_src.export(
1057 &chunk_query,
1058 None,
1059 None,
1060 &tuning_w,
1061 &mut sink,
1062 )?;
1063 if let Some(w) = sink.writer.take() {
1064 w.finish()?;
1065 }
1066 if sink.total_rows == 0 {
1067 return Ok((0, None, 0));
1068 }
1069 if validate {
1070 validate_output(sink.tmp.path(), format_type, sink.total_rows)?;
1071 }
1072 let file_bytes = std::fs::metadata(sink.tmp.path())
1073 .map(|m| m.len())
1074 .unwrap_or(0);
1075 let fmt = format::create_format(
1076 format_type,
1077 export_worker.compression,
1078 export_worker.compression_level,
1079 );
1080 let file_name = format!(
1081 "{}_{}_chunk{}.{}",
1082 export_name,
1083 chrono::Utc::now().format("%Y%m%d_%H%M%S"),
1084 chunk_index,
1085 fmt.file_extension()
1086 );
1087 let dest = destination::create_destination(&dest_config)?;
1088 dest.write(sink.tmp.path(), &file_name)?;
1089 Ok((sink.total_rows, Some(file_name), file_bytes))
1090 })();
1091
1092 match export_attempt {
1093 Ok(v) => return Ok(v),
1094 Err(e) => {
1095 let (transient, _, _) = classify_error(&e);
1096 if attempt < tuning_w.max_retries && transient {
1097 last_err = Some(e);
1098 continue;
1099 }
1100 return Err(e);
1101 }
1102 }
1103 }
1104 Err(last_err
1105 .unwrap_or_else(|| anyhow::anyhow!("chunk failed after retries")))
1106 })();
1107
1108 match result {
1109 Ok((rows, fname, file_bytes)) => {
1110 agg_rows.fetch_add(rows as i64, Ordering::Relaxed);
1111 if rows > 0 {
1112 agg_bytes.fetch_add(file_bytes, Ordering::Relaxed);
1113 agg_files.fetch_add(1, Ordering::Relaxed);
1114 if let (Some(name), Ok(store)) =
1115 (fname.as_ref(), StateStore::open(&config_path_w))
1116 {
1117 let _ = store.record_file(
1118 run_id_arc.as_str(),
1119 &export_name,
1120 name,
1121 rows as i64,
1122 file_bytes as i64,
1123 &fmt_label_w,
1124 Some(comp_label_w.as_str()),
1125 );
1126 }
1127 }
1128 let _ = StateStore::complete_chunk_task_at_path(
1129 &db_path,
1130 run_id_arc.as_str(),
1131 chunk_index,
1132 rows as i64,
1133 fname.as_deref(),
1134 );
1135 }
1136 Err(e) => {
1137 let msg = format!("{:#}", e);
1138 let _ = StateStore::fail_chunk_task_at_path(
1139 &db_path,
1140 run_id_arc.as_str(),
1141 chunk_index,
1142 &msg,
1143 );
1144 errors
1145 .lock()
1146 .unwrap_or_else(|e| e.into_inner())
1147 .push(format!("chunk {}: {}", chunk_index, msg));
1148 }
1149 }
1150 }
1151 });
1152 }
1153 });
1154
1155 summary.total_rows = agg_rows.load(Ordering::Relaxed);
1156 summary.bytes_written = agg_bytes.load(Ordering::Relaxed);
1157 summary.files_produced = agg_files.load(Ordering::Relaxed);
1158 if validate {
1159 summary.validated = Some(true);
1160 }
1161
1162 let errs = errors.into_inner().unwrap_or_else(|e| e.into_inner());
1163 if !errs.is_empty() {
1164 anyhow::bail!(
1165 "export '{}': parallel checkpoint worker errors:\n{}",
1166 export.name,
1167 errs.join("\n")
1168 );
1169 }
1170
1171 let pending = state.count_chunk_tasks_not_completed(&run_id)?;
1172 if pending > 0 {
1173 anyhow::bail!(
1174 "export '{}': {} chunk task(s) not completed; `rivet run --resume` or inspect `rivet state chunks --export {}`",
1175 export.name,
1176 pending,
1177 export.name
1178 );
1179 }
1180
1181 state.finalize_chunk_run_completed(&run_id)?;
1182 log::info!(
1183 "export '{}': chunk checkpoint parallel run completed",
1184 export.name
1185 );
1186 Ok(())
1187}
1188
1189#[cfg(test)]
1190mod tests {
1191 use super::*;
1192
1193 #[test]
1194 fn test_generate_chunks() {
1195 let chunks = generate_chunks(1, 100, 30);
1196 assert_eq!(chunks, vec![(1, 30), (31, 60), (61, 90), (91, 100)]);
1197 }
1198
1199 #[test]
1200 fn test_generate_chunks_exact() {
1201 let chunks = generate_chunks(0, 99, 50);
1202 assert_eq!(chunks, vec![(0, 49), (50, 99)]);
1203 }
1204
1205 #[test]
1206 fn test_generate_chunks_single() {
1207 let chunks = generate_chunks(1, 10, 100);
1208 assert_eq!(chunks, vec![(1, 10)]);
1209 }
1210
1211 #[test]
1212 fn test_generate_chunks_empty() {
1213 assert!(generate_chunks(10, 5, 100).is_empty());
1214 }
1215
1216 #[test]
1217 fn test_build_chunk_query_range_mode() {
1218 let q = build_chunk_query_sql("SELECT id FROM t", "id", 1, 100, false);
1219 assert!(q.contains("WHERE id BETWEEN 1 AND 100"), "got: {}", q);
1220 assert!(!q.contains("ROW_NUMBER()"), "got: {}", q);
1221 }
1222
1223 #[test]
1224 fn test_build_chunk_query_dense_mode() {
1225 let q = build_chunk_query_sql("SELECT id FROM t", "id", 1, 5000, true);
1226 assert!(q.contains("ROW_NUMBER()"), "got: {}", q);
1227 assert!(q.contains(RIVET_CHUNK_RN_COL), "got: {}", q);
1228 assert!(q.contains("BETWEEN 1 AND 5000"), "got: {}", q);
1229 }
1230}