Skip to main content

rivet_cli/pipeline/
chunked.rs

1use std::path::Path;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Duration;
4
5use super::RunSummary;
6use super::retry::classify_error;
7use super::sink::ExportSink;
8use super::validate::validate_output;
9use crate::config::{ExportConfig, SourceConfig};
10use crate::error::Result;
11use crate::preflight::chunk_sparsity_from_counts;
12use crate::source::{self, Source};
13use crate::state::StateStore;
14use crate::tuning::SourceTuning;
15use crate::{destination, format, resource};
16
17// ─── Chunk math ──────────────────────────────────────────────
18
19pub fn generate_chunks(min: i64, max: i64, chunk_size: i64) -> Vec<(i64, i64)> {
20    if max < min || chunk_size <= 0 {
21        return vec![];
22    }
23    let mut chunks = Vec::new();
24    let mut start = min;
25    while start <= max {
26        let end = (start + chunk_size - 1).min(max);
27        chunks.push((start, end));
28        start = end + 1;
29    }
30    chunks
31}
32
33/// Synthetic ordinal column for `chunk_dense`; stripped before writing files.
34pub(crate) const RIVET_CHUNK_RN_COL: &str = "_rivet_chunk_rn";
35
36pub(crate) fn build_chunk_query_sql(
37    base_query: &str,
38    order_column: &str,
39    start: i64,
40    end: i64,
41    chunk_dense: bool,
42) -> String {
43    if chunk_dense {
44        format!(
45            "SELECT * FROM (SELECT _rivet_i.*, ROW_NUMBER() OVER (ORDER BY _rivet_i.{oc}) AS {rn} FROM ({bq}) AS _rivet_i) AS _rivet_w WHERE _rivet_w.{rn} BETWEEN {s} AND {e}",
46            bq = base_query,
47            oc = order_column,
48            rn = RIVET_CHUNK_RN_COL,
49            s = start,
50            e = end,
51        )
52    } else {
53        format!(
54            "SELECT * FROM ({base}) AS _rivet WHERE {col} BETWEEN {start} AND {end}",
55            base = base_query,
56            col = order_column,
57            start = start,
58            end = end,
59        )
60    }
61}
62
63pub(crate) fn chunk_plan_fingerprint(
64    base_query: &str,
65    chunk_column: &str,
66    chunk_size: usize,
67    chunk_dense: bool,
68) -> String {
69    use xxhash_rust::xxh3::xxh3_64;
70    let mut buf = String::with_capacity(base_query.len() + chunk_column.len() + 32);
71    buf.push_str(base_query);
72    buf.push('\x1f');
73    buf.push_str(chunk_column);
74    buf.push('\x1f');
75    buf.push_str(&chunk_size.to_string());
76    buf.push('\x1f');
77    buf.push_str(if chunk_dense { "dense_rn" } else { "range" });
78    format!("{:016x}", xxh3_64(buf.as_bytes()))
79}
80
81// ─── Detection + generation ──────────────────────────────────
82
83fn parse_scalar_i64(raw: &str) -> Result<i64> {
84    let t = raw.trim();
85    t.parse::<i64>()
86        .or_else(|_| t.parse::<f64>().map(|x| x as i64))
87        .map_err(|_| anyhow::anyhow!("invalid numeric scalar: {:?}", t))
88}
89
90fn query_wrapped_row_count(src: &mut dyn Source, base_query: &str) -> Result<i64> {
91    let sql = format!("SELECT COUNT(*) FROM ({}) AS _rivet_rowcnt", base_query);
92    let raw = src
93        .query_scalar(&sql)?
94        .ok_or_else(|| anyhow::anyhow!("COUNT(*) returned no row"))?;
95    parse_scalar_i64(&raw)
96}
97
98fn log_chunk_boundaries_list(export_name: &str, chunks: &[(i64, i64)]) {
99    const HEAD: usize = 8;
100    const TAIL: usize = 8;
101    if chunks.is_empty() {
102        log::info!(
103            "export '{}': no BETWEEN windows (empty key range)",
104            export_name
105        );
106        return;
107    }
108    if chunks.len() <= HEAD + TAIL {
109        for (i, (a, b)) in chunks.iter().enumerate() {
110            log::info!("export '{}':   [{:>5}] {} .. {}", export_name, i, a, b);
111        }
112    } else {
113        for (i, (a, b)) in chunks.iter().enumerate().take(HEAD) {
114            log::info!("export '{}':   [{:>5}] {} .. {}", export_name, i, a, b);
115        }
116        log::info!(
117            "export '{}':   ... {} windows omitted ...",
118            export_name,
119            chunks.len() - HEAD - TAIL
120        );
121        for (i, (a, b)) in chunks.iter().enumerate().skip(chunks.len() - TAIL) {
122            log::info!("export '{}':   [{:>5}] {} .. {}", export_name, i, a, b);
123        }
124    }
125}
126
127fn log_chunk_sparsity_at_run(
128    export_name: &str,
129    chunk_column: &str,
130    chunk_size: usize,
131    min_val: i64,
132    max_val: i64,
133    chunks: &[(i64, i64)],
134    row_count: i64,
135) {
136    if row_count == 0 {
137        log::info!(
138            "export '{}': COUNT(*) = 0 — no rows in export query; {} BETWEEN window(s) from `{}` min..max (runs will skip empty chunks)",
139            export_name,
140            chunks.len(),
141            chunk_column
142        );
143        if !chunks.is_empty() && chunks.len() <= 24 {
144            log_chunk_boundaries_list(export_name, chunks);
145        }
146        return;
147    }
148
149    let info = chunk_sparsity_from_counts(row_count, min_val, max_val, chunk_size);
150    if info.is_sparse {
151        let fill_pct = info.density * 100.0;
152        let empty_hint = (1.0 - info.density).clamp(0.0, 1.0) * 100.0;
153        log::info!(
154            "export '{}': sparse `{}` range — ~{:.2}% of the min..max ID band contains rows (~{:.1}% of logical windows likely empty). \
155             rows={}, span≈{}, chunk_size={}, ~{} logical windows, {} BETWEEN chunks. Computed boundaries:",
156            export_name,
157            chunk_column,
158            fill_pct,
159            empty_hint,
160            info.row_count,
161            info.range_span,
162            chunk_size,
163            info.logical_windows,
164            chunks.len(),
165        );
166        log_chunk_boundaries_list(export_name, chunks);
167    } else {
168        log::info!(
169            "export '{}': `{}` range looks dense enough for BETWEEN chunking (rows={}, span≈{}, density≈{:.6}, {} chunks); continuing",
170            export_name,
171            chunk_column,
172            info.row_count,
173            info.range_span,
174            info.density,
175            chunks.len(),
176        );
177    }
178}
179
180pub(crate) fn detect_and_generate_chunks(
181    src: &mut dyn Source,
182    base_query: &str,
183    chunk_column: &str,
184    chunk_size: usize,
185    export_name: &str,
186    chunk_dense: bool,
187) -> Result<Vec<(i64, i64)>> {
188    if chunk_dense {
189        let row_count = query_wrapped_row_count(src, base_query)?;
190        log::info!(
191            "export '{}': chunk_dense: ROW_NUMBER() OVER (ORDER BY `{}`) — {} row(s), chunk_size={}",
192            export_name,
193            chunk_column,
194            row_count,
195            chunk_size
196        );
197        let chunks = if row_count <= 0 {
198            vec![]
199        } else {
200            generate_chunks(1, row_count, chunk_size as i64)
201        };
202        log::info!(
203            "export '{}': dense chunk plan: {} window(s) on ordinal 1..{}",
204            export_name,
205            chunks.len(),
206            row_count
207        );
208        return Ok(chunks);
209    }
210
211    let min_sql = format!(
212        "SELECT min({col}) FROM ({q}) AS _rivet",
213        col = chunk_column,
214        q = base_query,
215    );
216    let max_sql = format!(
217        "SELECT max({col}) FROM ({q}) AS _rivet",
218        col = chunk_column,
219        q = base_query,
220    );
221
222    let min_val = src
223        .query_scalar(&min_sql)?
224        .and_then(|s| s.trim().parse::<i64>().ok())
225        .unwrap_or(0);
226    let max_val = src
227        .query_scalar(&max_sql)?
228        .and_then(|s| s.trim().parse::<i64>().ok())
229        .unwrap_or(0);
230
231    log::info!(
232        "export '{}': chunk_column `{}` range {} .. {} (chunk_size={})",
233        export_name,
234        chunk_column,
235        min_val,
236        max_val,
237        chunk_size
238    );
239
240    let chunks = generate_chunks(min_val, max_val, chunk_size as i64);
241
242    match query_wrapped_row_count(src, base_query) {
243        Ok(row_count) => {
244            log_chunk_sparsity_at_run(
245                export_name,
246                chunk_column,
247                chunk_size,
248                min_val,
249                max_val,
250                &chunks,
251                row_count,
252            );
253        }
254        Err(e) => {
255            log::warn!(
256                "export '{}': could not run COUNT(*) for sparsity diagnostics: {:#}; proceeding with {} windows from min/max only",
257                export_name,
258                e,
259                chunks.len()
260            );
261            if chunks.len() <= 24 {
262                log_chunk_boundaries_list(export_name, &chunks);
263            } else {
264                log::info!(
265                    "export '{}': use `RUST_LOG=info rivet run` after fixing COUNT if you need the full boundary list",
266                    export_name
267                );
268            }
269        }
270    }
271
272    Ok(chunks)
273}
274
275// ─── Chunked Mode (Sequential) ──────────────────────────────
276
277pub(crate) fn run_chunked_sequential(
278    src: &mut dyn Source,
279    base_query: &str,
280    export: &ExportConfig,
281    tuning: &SourceTuning,
282    validate: bool,
283    summary: &mut RunSummary,
284    state: Option<&StateStore>,
285) -> Result<()> {
286    let col = export
287        .chunk_column
288        .as_deref()
289        .expect("chunk_column required for chunked mode");
290    let chunks = detect_and_generate_chunks(
291        src,
292        base_query,
293        col,
294        export.chunk_size,
295        &export.name,
296        export.chunk_dense,
297    )?;
298
299    log::info!(
300        "export '{}': {} chunks to process sequentially",
301        export.name,
302        chunks.len()
303    );
304
305    for (i, (start, end)) in chunks.iter().enumerate() {
306        if !resource::check_memory(tuning.memory_threshold_mb) {
307            log::warn!("memory threshold exceeded, pausing 5s before chunk {}", i);
308            std::thread::sleep(Duration::from_secs(5));
309        }
310
311        let chunk_query = build_chunk_query_sql(base_query, col, *start, *end, export.chunk_dense);
312        log::info!(
313            "export '{}': chunk {}/{} ({}..{})",
314            export.name,
315            i + 1,
316            chunks.len(),
317            start,
318            end
319        );
320
321        let mut sink = ExportSink::new(export)?;
322        src.export(&chunk_query, None, None, tuning, &mut sink)?;
323        if let Some(w) = sink.writer.take() {
324            w.finish()?;
325        }
326
327        summary.total_rows += sink.total_rows as i64;
328        log::info!(
329            "export '{}': chunk {} -- {} rows",
330            export.name,
331            i + 1,
332            sink.total_rows
333        );
334
335        if sink.total_rows > 0 {
336            if validate {
337                validate_output(sink.tmp.path(), export.format, sink.total_rows)?;
338                summary.validated = Some(true);
339            }
340            let file_bytes = std::fs::metadata(sink.tmp.path())
341                .map(|m| m.len())
342                .unwrap_or(0);
343            summary.bytes_written += file_bytes;
344            summary.files_produced += 1;
345
346            let fmt =
347                format::create_format(export.format, export.compression, export.compression_level);
348            let file_name = format!(
349                "{}_{}_chunk{}.{}",
350                export.name,
351                chrono::Utc::now().format("%Y%m%d_%H%M%S"),
352                i,
353                fmt.file_extension()
354            );
355            let dest = destination::create_destination(&export.destination)?;
356            dest.write(sink.tmp.path(), &file_name)?;
357
358            if let Some(st) = state {
359                let _ = st.record_file(
360                    &summary.run_id,
361                    &export.name,
362                    &file_name,
363                    sink.total_rows as i64,
364                    file_bytes as i64,
365                    &format!("{:?}", export.format).to_lowercase(),
366                    Some(&format!("{:?}", export.compression).to_lowercase()),
367                );
368            }
369        }
370    }
371
372    log::info!("export '{}': all chunks completed", export.name);
373    Ok(())
374}
375
376// ─── Chunked Mode (Parallel) ────────────────────────────────
377
378#[allow(clippy::too_many_arguments)]
379pub(super) fn run_chunked_parallel(
380    source_config: &SourceConfig,
381    state: &StateStore,
382    export: &ExportConfig,
383    tuning: &SourceTuning,
384    config_dir: &Path,
385    validate: bool,
386    summary: &mut RunSummary,
387    params: Option<&std::collections::HashMap<String, String>>,
388) -> Result<()> {
389    let base_query = export.resolve_query(config_dir, params)?;
390    let col = export
391        .chunk_column
392        .as_deref()
393        .expect("chunk_column required for chunked mode");
394
395    let mut src = source::create_source(source_config)?;
396    let chunks = detect_and_generate_chunks(
397        &mut *src,
398        &base_query,
399        col,
400        export.chunk_size,
401        &export.name,
402        export.chunk_dense,
403    )?;
404    drop(src);
405
406    let total_chunks = chunks.len();
407    let parallel = export.parallel.min(total_chunks);
408    log::info!(
409        "export '{}': {} chunks, {} parallel threads",
410        export.name,
411        total_chunks,
412        parallel
413    );
414
415    let completed = AtomicUsize::new(0);
416    let agg_rows = std::sync::atomic::AtomicI64::new(0);
417    let agg_bytes = std::sync::atomic::AtomicU64::new(0);
418    let agg_files = AtomicUsize::new(0);
419    let errors = std::sync::Mutex::new(Vec::<String>::new());
420    let file_records: std::sync::Mutex<Vec<(String, i64, i64)>> = std::sync::Mutex::new(Vec::new());
421    let semaphore = AtomicUsize::new(0);
422
423    std::thread::scope(|s| {
424        for (i, (start, end)) in chunks.iter().enumerate() {
425            while semaphore.load(Ordering::Relaxed) >= parallel {
426                std::thread::sleep(Duration::from_millis(50));
427            }
428
429            if !resource::check_memory(tuning.memory_threshold_mb) {
430                log::warn!("memory threshold exceeded, waiting before chunk {}", i);
431                while !resource::check_memory(tuning.memory_threshold_mb) {
432                    std::thread::sleep(Duration::from_secs(2));
433                }
434            }
435
436            semaphore.fetch_add(1, Ordering::Relaxed);
437
438            let source_config = source_config.clone();
439            let tuning = tuning.clone();
440            let export_for_sink = export.clone();
441            let export_name = &export.name;
442            let format_type = export.format;
443            let dest_config = &export.destination;
444            let base_query = &base_query;
445            let completed = &completed;
446            let agg_rows = &agg_rows;
447            let agg_bytes = &agg_bytes;
448            let agg_files = &agg_files;
449            let errors = &errors;
450            let file_records = &file_records;
451            let semaphore = &semaphore;
452            let start = *start;
453            let end = *end;
454
455            s.spawn(move || {
456                let result = (|| -> Result<()> {
457                    let chunk_query = build_chunk_query_sql(
458                        base_query,
459                        col,
460                        start,
461                        end,
462                        export_for_sink.chunk_dense,
463                    );
464
465                    let mut thread_src = source::create_source(&source_config)?;
466                    let mut sink = ExportSink::new(&export_for_sink)?;
467                    thread_src.export(&chunk_query, None, None, &tuning, &mut sink)?;
468                    if let Some(w) = sink.writer.take() {
469                        w.finish()?;
470                    }
471
472                    agg_rows.fetch_add(sink.total_rows as i64, Ordering::Relaxed);
473
474                    if sink.total_rows > 0 {
475                        if validate {
476                            validate_output(sink.tmp.path(), format_type, sink.total_rows)?;
477                        }
478                        let file_bytes = std::fs::metadata(sink.tmp.path())
479                            .map(|m| m.len())
480                            .unwrap_or(0);
481                        agg_bytes.fetch_add(file_bytes, Ordering::Relaxed);
482                        agg_files.fetch_add(1, Ordering::Relaxed);
483
484                        let fmt = format::create_format(
485                            format_type,
486                            export_for_sink.compression,
487                            export_for_sink.compression_level,
488                        );
489                        let file_name = format!(
490                            "{}_{}_chunk{}.{}",
491                            export_name,
492                            chrono::Utc::now().format("%Y%m%d_%H%M%S"),
493                            i,
494                            fmt.file_extension()
495                        );
496                        let dest = destination::create_destination(dest_config)?;
497                        dest.write(sink.tmp.path(), &file_name)?;
498                        file_records
499                            .lock()
500                            .unwrap_or_else(|e| e.into_inner())
501                            .push((file_name, sink.total_rows as i64, file_bytes as i64));
502                    }
503
504                    let done = completed.fetch_add(1, Ordering::Relaxed) + 1;
505                    log::info!(
506                        "export '{}': chunk {}/{} done ({} rows)",
507                        export_name,
508                        done,
509                        total_chunks,
510                        sink.total_rows
511                    );
512                    Ok(())
513                })();
514
515                semaphore.fetch_sub(1, Ordering::Relaxed);
516
517                if let Err(e) = result {
518                    log::error!("export '{}': chunk {} failed: {:#}", export_name, i, e);
519                    errors
520                        .lock()
521                        .unwrap_or_else(|e| e.into_inner())
522                        .push(format!("chunk {}: {:#}", i, e));
523                }
524            });
525        }
526    });
527
528    summary.total_rows = agg_rows.load(Ordering::Relaxed);
529    summary.bytes_written = agg_bytes.load(Ordering::Relaxed);
530    summary.files_produced = agg_files.load(Ordering::Relaxed);
531    if validate {
532        summary.validated = Some(true);
533    }
534
535    let fmt_name = format!("{:?}", export.format).to_lowercase();
536    let comp_name = format!("{:?}", export.compression).to_lowercase();
537    for (fname, rows, bytes) in file_records.into_inner().unwrap_or_else(|e| e.into_inner()) {
538        let _ = state.record_file(
539            &summary.run_id,
540            &export.name,
541            &fname,
542            rows,
543            bytes,
544            &fmt_name,
545            Some(&comp_name),
546        );
547    }
548
549    let errs = errors.into_inner().unwrap_or_else(|e| e.into_inner());
550    if !errs.is_empty() {
551        anyhow::bail!(
552            "export '{}': {} chunks failed:\n{}",
553            export.name,
554            errs.len(),
555            errs.join("\n")
556        );
557    }
558
559    log::info!(
560        "export '{}': all {} chunks completed",
561        export.name,
562        total_chunks
563    );
564    Ok(())
565}
566
567// ─── Chunk checkpoint (SQLite plan + resume) ─────────────────
568
569fn chunk_max_attempts_for_export(export: &ExportConfig, tuning: &SourceTuning) -> u32 {
570    export
571        .chunk_max_attempts
572        .unwrap_or_else(|| tuning.max_retries.saturating_add(1).max(1))
573}
574
575#[allow(clippy::too_many_arguments)]
576fn ensure_chunk_checkpoint_plan(
577    state: &StateStore,
578    export: &ExportConfig,
579    summary: &mut RunSummary,
580    base_query: &str,
581    col: &str,
582    chunks: &[(i64, i64)],
583    resume: bool,
584    tuning: &SourceTuning,
585) -> Result<String> {
586    let plan_hash = chunk_plan_fingerprint(base_query, col, export.chunk_size, export.chunk_dense);
587    let max_att = chunk_max_attempts_for_export(export, tuning);
588
589    if resume {
590        let Some((rid, stored_hash)) = state.find_in_progress_chunk_run(&export.name)? else {
591            anyhow::bail!(
592                "export '{}': --resume but no in-progress chunk checkpoint; run without --resume first or `rivet state reset-chunks --export {}`",
593                export.name,
594                export.name
595            );
596        };
597        if stored_hash != plan_hash {
598            anyhow::bail!(
599                "export '{}': chunk plan fingerprint mismatch (query, chunk_column, chunk_size, or chunk_dense changed); cannot resume",
600                export.name
601            );
602        }
603        summary.run_id = rid.clone();
604        let n = state.reset_stale_running_chunk_tasks(&rid)?;
605        if n > 0 {
606            log::warn!(
607                "export '{}': reset {} stale 'running' chunk task(s) after resume",
608                export.name,
609                n
610            );
611        }
612        return Ok(rid);
613    }
614
615    if let Some((rid, _)) = state.find_in_progress_chunk_run(&export.name)? {
616        anyhow::bail!(
617            "export '{}': chunk checkpoint run '{}' still in progress; use `rivet run --resume` or `rivet state reset-chunks --export {}`",
618            export.name,
619            rid,
620            export.name
621        );
622    }
623
624    state.create_chunk_run(&summary.run_id, &export.name, &plan_hash, max_att)?;
625    state.insert_chunk_tasks(&summary.run_id, chunks)?;
626    log::info!(
627        "export '{}': chunk checkpoint: {} tasks saved (run_id={})",
628        export.name,
629        chunks.len(),
630        summary.run_id
631    );
632    Ok(summary.run_id.clone())
633}
634
635#[allow(clippy::too_many_arguments)]
636fn export_one_chunk_range(
637    src: &mut dyn Source,
638    base_query: &str,
639    col: &str,
640    start: i64,
641    end: i64,
642    chunk_index: i64,
643    export: &ExportConfig,
644    tuning: &SourceTuning,
645    validate: bool,
646) -> Result<(usize, Option<String>, u64)> {
647    let chunk_query = build_chunk_query_sql(base_query, col, start, end, export.chunk_dense);
648
649    let mut sink = ExportSink::new(export)?;
650    src.export(&chunk_query, None, None, tuning, &mut sink)?;
651    if let Some(w) = sink.writer.take() {
652        w.finish()?;
653    }
654
655    if sink.total_rows == 0 {
656        return Ok((0, None, 0));
657    }
658
659    if validate {
660        validate_output(sink.tmp.path(), export.format, sink.total_rows)?;
661    }
662    let file_bytes = std::fs::metadata(sink.tmp.path())
663        .map(|m| m.len())
664        .unwrap_or(0);
665
666    let fmt = format::create_format(export.format, export.compression, export.compression_level);
667    let file_name = format!(
668        "{}_{}_chunk{}.{}",
669        export.name,
670        chrono::Utc::now().format("%Y%m%d_%H%M%S"),
671        chunk_index,
672        fmt.file_extension()
673    );
674    let dest = destination::create_destination(&export.destination)?;
675    dest.write(sink.tmp.path(), &file_name)?;
676
677    Ok((sink.total_rows, Some(file_name), file_bytes))
678}
679
680#[allow(clippy::too_many_arguments)]
681fn run_chunk_with_source_retries(
682    source_config: &SourceConfig,
683    base_query: &str,
684    col: &str,
685    start: i64,
686    end: i64,
687    chunk_index: i64,
688    export: &ExportConfig,
689    tuning: &SourceTuning,
690    validate: bool,
691) -> Result<(usize, Option<String>, u64)> {
692    let mut last_err: Option<anyhow::Error> = None;
693    for attempt in 0..=tuning.max_retries {
694        if attempt > 0 {
695            let (_, _needs_reconnect, extra_delay) = last_err
696                .as_ref()
697                .map(classify_error)
698                .unwrap_or((false, false, 0));
699            let backoff = tuning.retry_backoff_ms * 2u64.pow(attempt - 1) + extra_delay;
700            log::warn!(
701                "export '{}': chunk {} retry {}/{} in {}ms",
702                export.name,
703                chunk_index,
704                attempt,
705                tuning.max_retries,
706                backoff
707            );
708            std::thread::sleep(Duration::from_millis(backoff));
709        }
710
711        let mut src = match source::create_source(source_config) {
712            Ok(s) => s,
713            Err(e) => {
714                let (transient, _, _) = classify_error(&e);
715                if attempt < tuning.max_retries && transient {
716                    last_err = Some(e);
717                    continue;
718                }
719                return Err(e);
720            }
721        };
722
723        match export_one_chunk_range(
724            &mut *src,
725            base_query,
726            col,
727            start,
728            end,
729            chunk_index,
730            export,
731            tuning,
732            validate,
733        ) {
734            Ok(v) => return Ok(v),
735            Err(e) => {
736                let (transient, _, _) = classify_error(&e);
737                if attempt < tuning.max_retries && transient {
738                    last_err = Some(e);
739                    continue;
740                }
741                return Err(e);
742            }
743        }
744    }
745    Err(last_err.unwrap_or_else(|| anyhow::anyhow!("chunk export failed after retries")))
746}
747
748#[allow(clippy::too_many_arguments)]
749pub(crate) fn run_chunked_sequential_checkpoint(
750    src: &mut dyn Source,
751    source_config: &SourceConfig,
752    state: &StateStore,
753    base_query: &str,
754    export: &ExportConfig,
755    tuning: &SourceTuning,
756    validate: bool,
757    summary: &mut RunSummary,
758    st: Option<&StateStore>,
759    resume: bool,
760    config_path: &str,
761) -> Result<()> {
762    let _ = config_path;
763    let col = export
764        .chunk_column
765        .as_deref()
766        .expect("chunk_column required for chunked mode");
767
768    let chunks = if resume {
769        vec![]
770    } else {
771        detect_and_generate_chunks(
772            src,
773            base_query,
774            col,
775            export.chunk_size,
776            &export.name,
777            export.chunk_dense,
778        )?
779    };
780
781    let run_id = ensure_chunk_checkpoint_plan(
782        state, export, summary, base_query, col, &chunks, resume, tuning,
783    )?;
784
785    if !resume && !resource::check_memory(tuning.memory_threshold_mb) {
786        log::warn!("memory threshold exceeded before chunk export; pausing 5s");
787        std::thread::sleep(Duration::from_secs(5));
788    }
789
790    while let Some((chunk_index, sk, ek)) = state.claim_next_chunk_task(&run_id)? {
791        if !resource::check_memory(tuning.memory_threshold_mb) {
792            log::warn!(
793                "memory threshold exceeded, pausing 5s before chunk {}",
794                chunk_index
795            );
796            std::thread::sleep(Duration::from_secs(5));
797        }
798
799        let start: i64 = sk
800            .parse()
801            .map_err(|_| anyhow::anyhow!("chunk {}: invalid start_key {:?}", chunk_index, sk))?;
802        let end: i64 = ek
803            .parse()
804            .map_err(|_| anyhow::anyhow!("chunk {}: invalid end_key {:?}", chunk_index, ek))?;
805
806        log::info!(
807            "export '{}': checkpoint chunk {} ({}..{})",
808            export.name,
809            chunk_index,
810            start,
811            end
812        );
813
814        match run_chunk_with_source_retries(
815            source_config,
816            base_query,
817            col,
818            start,
819            end,
820            chunk_index,
821            export,
822            tuning,
823            validate,
824        ) {
825            Ok((rows, fname, file_bytes)) => {
826                summary.total_rows += rows as i64;
827                if rows > 0 {
828                    summary.bytes_written += file_bytes;
829                    summary.files_produced += 1;
830                    if let Some(name) = &fname
831                        && let Some(store) = st
832                    {
833                        let _ = store.record_file(
834                            &summary.run_id,
835                            &export.name,
836                            name,
837                            rows as i64,
838                            file_bytes as i64,
839                            &format!("{:?}", export.format).to_lowercase(),
840                            Some(&format!("{:?}", export.compression).to_lowercase()),
841                        );
842                    }
843                }
844                state.complete_chunk_task(&run_id, chunk_index, rows as i64, fname.as_deref())?;
845            }
846            Err(e) => {
847                let msg = format!("{:#}", e);
848                log::error!(
849                    "export '{}': chunk {} failed: {}",
850                    export.name,
851                    chunk_index,
852                    msg
853                );
854                state.fail_chunk_task(&run_id, chunk_index, &msg)?;
855            }
856        }
857    }
858
859    let pending = state.count_chunk_tasks_not_completed(&run_id)?;
860    if pending > 0 {
861        anyhow::bail!(
862            "export '{}': chunk checkpoint incomplete ({} tasks not completed); fix errors and `rivet run --resume` or `rivet state reset-chunks`",
863            export.name,
864            pending
865        );
866    }
867
868    state.finalize_chunk_run_completed(&run_id)?;
869    log::info!(
870        "export '{}': chunk checkpoint run completed (run_id={})",
871        export.name,
872        run_id
873    );
874    Ok(())
875}
876
877#[allow(clippy::too_many_arguments)]
878pub(super) fn run_chunked_parallel_checkpoint(
879    config_path: &str,
880    source_config: &SourceConfig,
881    state: &StateStore,
882    export: &ExportConfig,
883    tuning: &SourceTuning,
884    config_dir: &Path,
885    validate: bool,
886    summary: &mut RunSummary,
887    params: Option<&std::collections::HashMap<String, String>>,
888    resume: bool,
889) -> Result<()> {
890    let base_query = export.resolve_query(config_dir, params)?;
891    let col = export
892        .chunk_column
893        .as_deref()
894        .expect("chunk_column required for chunked mode");
895
896    let chunks = if resume {
897        vec![]
898    } else {
899        let mut src = source::create_source(source_config)?;
900        detect_and_generate_chunks(
901            &mut *src,
902            &base_query,
903            col,
904            export.chunk_size,
905            &export.name,
906            export.chunk_dense,
907        )?
908    };
909
910    let run_id = ensure_chunk_checkpoint_plan(
911        state,
912        export,
913        summary,
914        &base_query,
915        col,
916        &chunks,
917        resume,
918        tuning,
919    )?;
920
921    let total_tasks = {
922        let tasks = state.list_chunk_tasks_for_run(&run_id)?;
923        tasks.len().max(1)
924    };
925    let parallel = export.parallel.min(total_tasks);
926    log::info!(
927        "export '{}': chunk checkpoint parallel: {} workers, run_id={}",
928        export.name,
929        parallel,
930        run_id
931    );
932
933    let db_path = state.database_path().to_path_buf();
934    let run_id_arc = std::sync::Arc::new(run_id.clone());
935    let agg_rows = std::sync::atomic::AtomicI64::new(0);
936    let agg_bytes = std::sync::atomic::AtomicU64::new(0);
937    let agg_files = std::sync::atomic::AtomicUsize::new(0);
938    let errors = std::sync::Mutex::new(Vec::<String>::new());
939
940    let export_name = export.name.clone();
941    let export_for_workers = export.clone();
942    let format_type = export.format;
943    let dest_config = export.destination.clone();
944    let tuning_cl = tuning.clone();
945    let source_config_cl = source_config.clone();
946    let base_query_cl = base_query.clone();
947    let col_owned = col.to_string();
948    let config_path_owned = config_path.to_string();
949    let fmt_label = format!("{:?}", export.format).to_lowercase();
950    let comp_label = format!("{:?}", export.compression).to_lowercase();
951
952    std::thread::scope(|s| {
953        for _ in 0..parallel {
954            let db_path = db_path.clone();
955            let run_id_arc = std::sync::Arc::clone(&run_id_arc);
956            let agg_rows = &agg_rows;
957            let agg_bytes = &agg_bytes;
958            let agg_files = &agg_files;
959            let errors = &errors;
960            let export_name = export_name.clone();
961            let export_worker = export_for_workers.clone();
962            let tuning_w = tuning_cl.clone();
963            let source_config_w = source_config_cl.clone();
964            let base_query_w = base_query_cl.clone();
965            let col_w = col_owned.clone();
966            let dest_config = dest_config.clone();
967            let config_path_w = config_path_owned.clone();
968            let fmt_label_w = fmt_label.clone();
969            let comp_label_w = comp_label.clone();
970
971            s.spawn(move || {
972                loop {
973                    let claimed = match StateStore::claim_next_chunk_task_at_path(
974                        &db_path,
975                        run_id_arc.as_str(),
976                    ) {
977                        Ok(c) => c,
978                        Err(e) => {
979                            errors
980                                .lock()
981                                .unwrap_or_else(|e| e.into_inner())
982                                .push(format!("claim error: {:#}", e));
983                            break;
984                        }
985                    };
986                    let Some((chunk_index, sk, ek)) = claimed else {
987                        break;
988                    };
989
990                    if !resource::check_memory(tuning_w.memory_threshold_mb) {
991                        log::warn!("memory threshold exceeded in worker; pausing 2s");
992                        std::thread::sleep(Duration::from_secs(2));
993                    }
994
995                    let start: i64 = match sk.parse() {
996                        Ok(v) => v,
997                        Err(_) => {
998                            let _ = StateStore::fail_chunk_task_at_path(
999                                &db_path,
1000                                run_id_arc.as_str(),
1001                                chunk_index,
1002                                "invalid start_key",
1003                            );
1004                            continue;
1005                        }
1006                    };
1007                    let end: i64 = match ek.parse() {
1008                        Ok(v) => v,
1009                        Err(_) => {
1010                            let _ = StateStore::fail_chunk_task_at_path(
1011                                &db_path,
1012                                run_id_arc.as_str(),
1013                                chunk_index,
1014                                "invalid end_key",
1015                            );
1016                            continue;
1017                        }
1018                    };
1019
1020                    let chunk_query = build_chunk_query_sql(
1021                        &base_query_w,
1022                        &col_w,
1023                        start,
1024                        end,
1025                        export_worker.chunk_dense,
1026                    );
1027
1028                    let result = (|| -> Result<(usize, Option<String>, u64)> {
1029                        let mut last_err: Option<anyhow::Error> = None;
1030                        for attempt in 0..=tuning_w.max_retries {
1031                            if attempt > 0 {
1032                                let (_, _, extra_delay) = last_err
1033                                    .as_ref()
1034                                    .map(classify_error)
1035                                    .unwrap_or((false, false, 0));
1036                                let backoff =
1037                                    tuning_w.retry_backoff_ms * 2u64.pow(attempt - 1) + extra_delay;
1038                                std::thread::sleep(Duration::from_millis(backoff));
1039                            }
1040
1041                            let mut thread_src = match source::create_source(&source_config_w) {
1042                                Ok(s) => s,
1043                                Err(e) => {
1044                                    let (transient, _, _) = classify_error(&e);
1045                                    if attempt < tuning_w.max_retries && transient {
1046                                        last_err = Some(e);
1047                                        continue;
1048                                    }
1049                                    return Err(e);
1050                                }
1051                            };
1052
1053                            let mut sink = ExportSink::new(&export_worker)?;
1054
1055                            let export_attempt = (|| -> Result<(usize, Option<String>, u64)> {
1056                                thread_src.export(
1057                                    &chunk_query,
1058                                    None,
1059                                    None,
1060                                    &tuning_w,
1061                                    &mut sink,
1062                                )?;
1063                                if let Some(w) = sink.writer.take() {
1064                                    w.finish()?;
1065                                }
1066                                if sink.total_rows == 0 {
1067                                    return Ok((0, None, 0));
1068                                }
1069                                if validate {
1070                                    validate_output(sink.tmp.path(), format_type, sink.total_rows)?;
1071                                }
1072                                let file_bytes = std::fs::metadata(sink.tmp.path())
1073                                    .map(|m| m.len())
1074                                    .unwrap_or(0);
1075                                let fmt = format::create_format(
1076                                    format_type,
1077                                    export_worker.compression,
1078                                    export_worker.compression_level,
1079                                );
1080                                let file_name = format!(
1081                                    "{}_{}_chunk{}.{}",
1082                                    export_name,
1083                                    chrono::Utc::now().format("%Y%m%d_%H%M%S"),
1084                                    chunk_index,
1085                                    fmt.file_extension()
1086                                );
1087                                let dest = destination::create_destination(&dest_config)?;
1088                                dest.write(sink.tmp.path(), &file_name)?;
1089                                Ok((sink.total_rows, Some(file_name), file_bytes))
1090                            })();
1091
1092                            match export_attempt {
1093                                Ok(v) => return Ok(v),
1094                                Err(e) => {
1095                                    let (transient, _, _) = classify_error(&e);
1096                                    if attempt < tuning_w.max_retries && transient {
1097                                        last_err = Some(e);
1098                                        continue;
1099                                    }
1100                                    return Err(e);
1101                                }
1102                            }
1103                        }
1104                        Err(last_err
1105                            .unwrap_or_else(|| anyhow::anyhow!("chunk failed after retries")))
1106                    })();
1107
1108                    match result {
1109                        Ok((rows, fname, file_bytes)) => {
1110                            agg_rows.fetch_add(rows as i64, Ordering::Relaxed);
1111                            if rows > 0 {
1112                                agg_bytes.fetch_add(file_bytes, Ordering::Relaxed);
1113                                agg_files.fetch_add(1, Ordering::Relaxed);
1114                                if let (Some(name), Ok(store)) =
1115                                    (fname.as_ref(), StateStore::open(&config_path_w))
1116                                {
1117                                    let _ = store.record_file(
1118                                        run_id_arc.as_str(),
1119                                        &export_name,
1120                                        name,
1121                                        rows as i64,
1122                                        file_bytes as i64,
1123                                        &fmt_label_w,
1124                                        Some(comp_label_w.as_str()),
1125                                    );
1126                                }
1127                            }
1128                            let _ = StateStore::complete_chunk_task_at_path(
1129                                &db_path,
1130                                run_id_arc.as_str(),
1131                                chunk_index,
1132                                rows as i64,
1133                                fname.as_deref(),
1134                            );
1135                        }
1136                        Err(e) => {
1137                            let msg = format!("{:#}", e);
1138                            let _ = StateStore::fail_chunk_task_at_path(
1139                                &db_path,
1140                                run_id_arc.as_str(),
1141                                chunk_index,
1142                                &msg,
1143                            );
1144                            errors
1145                                .lock()
1146                                .unwrap_or_else(|e| e.into_inner())
1147                                .push(format!("chunk {}: {}", chunk_index, msg));
1148                        }
1149                    }
1150                }
1151            });
1152        }
1153    });
1154
1155    summary.total_rows = agg_rows.load(Ordering::Relaxed);
1156    summary.bytes_written = agg_bytes.load(Ordering::Relaxed);
1157    summary.files_produced = agg_files.load(Ordering::Relaxed);
1158    if validate {
1159        summary.validated = Some(true);
1160    }
1161
1162    let errs = errors.into_inner().unwrap_or_else(|e| e.into_inner());
1163    if !errs.is_empty() {
1164        anyhow::bail!(
1165            "export '{}': parallel checkpoint worker errors:\n{}",
1166            export.name,
1167            errs.join("\n")
1168        );
1169    }
1170
1171    let pending = state.count_chunk_tasks_not_completed(&run_id)?;
1172    if pending > 0 {
1173        anyhow::bail!(
1174            "export '{}': {} chunk task(s) not completed; `rivet run --resume` or inspect `rivet state chunks --export {}`",
1175            export.name,
1176            pending,
1177            export.name
1178        );
1179    }
1180
1181    state.finalize_chunk_run_completed(&run_id)?;
1182    log::info!(
1183        "export '{}': chunk checkpoint parallel run completed",
1184        export.name
1185    );
1186    Ok(())
1187}
1188
1189#[cfg(test)]
1190mod tests {
1191    use super::*;
1192
1193    #[test]
1194    fn test_generate_chunks() {
1195        let chunks = generate_chunks(1, 100, 30);
1196        assert_eq!(chunks, vec![(1, 30), (31, 60), (61, 90), (91, 100)]);
1197    }
1198
1199    #[test]
1200    fn test_generate_chunks_exact() {
1201        let chunks = generate_chunks(0, 99, 50);
1202        assert_eq!(chunks, vec![(0, 49), (50, 99)]);
1203    }
1204
1205    #[test]
1206    fn test_generate_chunks_single() {
1207        let chunks = generate_chunks(1, 10, 100);
1208        assert_eq!(chunks, vec![(1, 10)]);
1209    }
1210
1211    #[test]
1212    fn test_generate_chunks_empty() {
1213        assert!(generate_chunks(10, 5, 100).is_empty());
1214    }
1215
1216    #[test]
1217    fn test_build_chunk_query_range_mode() {
1218        let q = build_chunk_query_sql("SELECT id FROM t", "id", 1, 100, false);
1219        assert!(q.contains("WHERE id BETWEEN 1 AND 100"), "got: {}", q);
1220        assert!(!q.contains("ROW_NUMBER()"), "got: {}", q);
1221    }
1222
1223    #[test]
1224    fn test_build_chunk_query_dense_mode() {
1225        let q = build_chunk_query_sql("SELECT id FROM t", "id", 1, 5000, true);
1226        assert!(q.contains("ROW_NUMBER()"), "got: {}", q);
1227        assert!(q.contains(RIVET_CHUNK_RN_COL), "got: {}", q);
1228        assert!(q.contains("BETWEEN 1 AND 5000"), "got: {}", q);
1229    }
1230}