use std::time::Duration;
use super::super::{
RunSummary,
progress::ChunkProgress,
retry::{RetryClass, classify_error},
sink::ExportSink,
validate::validate_output,
};
use super::{
ChunkSource, chunked_plan, config_hint, detect_and_generate_chunks,
ensure_chunk_checkpoint_plan,
};
use crate::error::Result;
use crate::journal::RunEvent;
use crate::plan::{ChunkedPlan, ResolvedRunPlan};
use crate::source::{self, Source};
use crate::state::StateStore;
use crate::{destination, format, resource};
use super::math::build_chunk_query_sql;
#[allow(clippy::too_many_arguments)] fn export_one_chunk_range(
src: &mut dyn Source,
base_query: &str,
cp: &ChunkedPlan,
start: i64,
end: i64,
chunk_index: i64,
plan: &ResolvedRunPlan,
summary: &mut RunSummary,
) -> Result<(usize, Option<super::super::commit::PartRecord>)> {
let chunk_query = build_chunk_query_sql(
base_query,
&cp.column,
start,
end,
cp.dense,
cp.by_days.is_some(),
plan.source.source_type,
);
let mut sink = ExportSink::new(plan)?;
src.export(
&source::ExportRequest::wrapped(
&chunk_query,
base_query,
&plan.tuning,
&plan.column_overrides,
),
&mut sink,
)?;
if let Some(w) = sink.writer.take() {
w.finish()?;
}
if let Some(s) = sink.dest_schema.as_deref() {
super::super::manifest_writer::record_run_schema_fingerprint(summary, s);
}
if sink.total_rows == 0 {
return Ok((0, None));
}
if plan.validate {
validate_output(sink.tmp.path(), plan.format, sink.total_rows)?;
summary.validated = Some(true);
}
let fmt = format::create_format(plan.format, plan.compression, plan.compression_level, None);
let file_name =
super::chunk_part_filename(&plan.export_name, chunk_index, fmt.file_extension());
let dest = destination::create_destination(&plan.destination)?;
let rec = super::super::commit::write_part_file(
dest.as_ref(),
sink.tmp.path(),
sink.total_rows as i64,
file_name,
)?;
Ok((sink.total_rows, Some(rec)))
}
#[allow(clippy::too_many_arguments)] fn run_chunk_with_source_retries(
base_query: &str,
cp: &ChunkedPlan,
start: i64,
end: i64,
chunk_index: i64,
plan: &ResolvedRunPlan,
summary: &mut RunSummary,
) -> Result<(usize, Option<super::super::commit::PartRecord>)> {
let mut last_err: Option<anyhow::Error> = None;
for attempt in 0..=plan.tuning.max_retries {
if attempt > 0 {
summary.retries = summary.retries.saturating_add(1);
let class = last_err
.as_ref()
.map(classify_error)
.unwrap_or(RetryClass::Permanent);
let backoff =
plan.tuning.retry_backoff_ms * 2u64.pow(attempt - 1) + class.extra_delay_ms();
log::warn!(
"export '{}': chunk {} retry {}/{} in {}ms",
plan.export_name,
chunk_index,
attempt,
plan.tuning.max_retries,
backoff
);
std::thread::sleep(Duration::from_millis(backoff));
}
let mut src = match source::create_source(&plan.source) {
Ok(s) => s,
Err(e) => {
if attempt < plan.tuning.max_retries && classify_error(&e).is_transient() {
last_err = Some(e);
continue;
}
return Err(e);
}
};
match export_one_chunk_range(
&mut *src,
base_query,
cp,
start,
end,
chunk_index,
plan,
summary,
) {
Ok(v) => return Ok(v),
Err(e) => {
if attempt < plan.tuning.max_retries && classify_error(&e).is_transient() {
last_err = Some(e);
continue;
}
return Err(e);
}
}
}
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("chunk export failed after retries")))
}
pub(crate) fn run_chunked_sequential_checkpoint(
src: &mut dyn Source,
state: &StateStore,
plan: &ResolvedRunPlan,
summary: &mut RunSummary,
config_path: &str,
chunk_source: ChunkSource,
) -> Result<()> {
let cp = chunked_plan(plan);
let chunks = if plan.resume {
vec![]
} else {
match chunk_source {
ChunkSource::Detect => detect_and_generate_chunks(
src,
&plan.base_query,
&cp.column,
cp.chunk_size,
cp.chunk_count,
&plan.export_name,
cp.dense,
cp.by_days,
plan.source.source_type,
)?,
ChunkSource::Precomputed(ranges) => ranges,
}
};
let run_id = ensure_chunk_checkpoint_plan(state, plan, cp, summary, &chunks, config_path)?;
if plan.resume {
let _stats = super::apply_m8_resume_decisions(state, &run_id, plan, summary)?;
}
let total_tasks = state.count_chunk_tasks_total(&run_id).unwrap_or(1);
let pb = ChunkProgress::new(&plan.export_name, total_tasks);
if !plan.resume && !resource::check_memory(plan.tuning.memory_threshold_mb) {
log::warn!("memory threshold exceeded before chunk export; pausing 5s");
std::thread::sleep(Duration::from_secs(5));
}
while let Some((chunk_index, sk, ek)) = state.claim_next_chunk_task(&run_id)? {
if !resource::check_memory(plan.tuning.memory_threshold_mb) {
log::warn!(
"memory threshold exceeded, pausing 5s before chunk {}",
chunk_index
);
std::thread::sleep(Duration::from_secs(5));
}
let start: i64 = sk
.parse()
.map_err(|_| anyhow::anyhow!("chunk {}: invalid start_key {:?}", chunk_index, sk))?;
let end: i64 = ek
.parse()
.map_err(|_| anyhow::anyhow!("chunk {}: invalid end_key {:?}", chunk_index, ek))?;
log::info!(
"export '{}': checkpoint chunk {} ({}..{})",
plan.export_name,
chunk_index,
start,
end
);
summary.journal.record(RunEvent::ChunkStarted {
chunk_index,
start_key: sk.clone(),
end_key: ek.clone(),
});
match run_chunk_with_source_retries(
&plan.base_query,
cp,
start,
end,
chunk_index,
plan,
summary,
) {
Ok((rows, part)) => {
summary.total_rows += rows as i64;
pb.inc(summary.total_rows);
let fname: Option<String> = match &part {
Some(rec) => {
super::super::commit::record_part(
plan,
summary,
Some(state),
rec,
super::super::commit::PartKind::Chunk { chunk_index },
);
Some(rec.file_name.clone())
}
None => {
summary.journal.record(RunEvent::ChunkCompleted {
chunk_index,
rows: 0,
file_name: None,
});
None
}
};
crate::test_hook::maybe_panic_at_chunk("after_chunk_file", chunk_index);
state.complete_chunk_task(&run_id, chunk_index, rows as i64, fname.as_deref())?;
crate::test_hook::maybe_panic_at_chunk("after_chunk_complete", chunk_index);
}
Err(e) => {
let msg = crate::redact::redact_error(&e);
log::error!(
"export '{}': chunk {} failed: {}",
plan.export_name,
chunk_index,
msg
);
summary.journal.record(RunEvent::ChunkFailed {
chunk_index,
error: msg.clone(),
attempt: 1,
});
state.fail_chunk_task(&run_id, chunk_index, &msg)?;
}
}
}
let pending = state.count_chunk_tasks_not_completed(&run_id)?;
if pending > 0 {
anyhow::bail!(
"export '{}': chunk checkpoint incomplete ({} tasks not completed); fix errors and `rivet run {} --export {} --resume` or `rivet state reset-chunks {} --export {}`",
plan.export_name,
pending,
config_hint(config_path),
plan.export_name,
config_hint(config_path),
plan.export_name
);
}
pb.finish(summary.total_rows);
state.finalize_chunk_run_completed(&run_id)?;
super::super::run_store::RunStore::finalize(state, plan, summary)
.with_progression(super::super::run_store::Progression::Chunked)
.commit()?;
log::info!(
"export '{}': chunk checkpoint run completed (run_id={})",
plan.export_name,
run_id
);
Ok(())
}