use super::manifest_writer;
use super::{RunSummary, sink::ExportSink};
use crate::config::IncrementalCursorMode;
use crate::error::Result;
use crate::plan::{ExtractionStrategy, IncrementalCursorPlan, KeysetPlan, ResolvedRunPlan};
use crate::source::{self, Source};
use crate::state::StateStore;
use crate::types::CursorState;
use crate::{destination, format};
fn keyset_plan(plan: &ResolvedRunPlan) -> &KeysetPlan {
match &plan.strategy {
ExtractionStrategy::Keyset(kp) => kp,
_ => unreachable!("keyset runner called with non-keyset plan"),
}
}
pub(crate) fn run_keyset(
src: &mut dyn Source,
plan: &ResolvedRunPlan,
summary: &mut RunSummary,
state: Option<&StateStore>,
) -> Result<()> {
let kp = keyset_plan(plan);
let key_plan = IncrementalCursorPlan {
primary_column: kp.key_column.clone(),
fallback_column: None,
mode: IncrementalCursorMode::SingleColumn,
};
log::info!(
"export '{}': keyset (seek) pagination on '{}', page size {}",
plan.export_name,
kp.key_column,
kp.chunk_size
);
let mut last: Option<String> = None;
let mut pages: usize = 0;
loop {
let cursor = last.as_ref().map(|v| CursorState {
export_name: plan.export_name.clone(),
last_cursor_value: Some(v.clone()),
last_run_at: None,
});
let mut sink = ExportSink::new(plan)?;
src.export(
&source::ExportRequest::unwrapped(
&plan.base_query,
&plan.tuning,
&plan.column_overrides,
)
.with_incremental(Some(&key_plan))
.with_cursor(cursor.as_ref())
.with_page_limit(kp.chunk_size),
&mut sink,
)?;
if let Some(w) = sink.writer.take() {
w.finish()?;
}
if let Some(s) = sink.dest_schema.as_deref() {
manifest_writer::record_run_schema_fingerprint(summary, s);
}
let rows = sink.total_rows;
if rows == 0 {
break; }
summary.total_rows += rows as i64;
let fmt =
format::create_format(plan.format, plan.compression, plan.compression_level, None);
let base = format!(
"{}_{}_keyset{}.{}",
plan.export_name,
chrono::Utc::now().format("%Y%m%d_%H%M%S"),
pages,
fmt.file_extension()
);
let dest = destination::create_destination(&plan.destination)?;
let recs = super::commit::write_sink_parts(
dest.as_ref(),
&mut sink,
plan.validate.then_some(plan.format),
|idx, count| super::commit::part_indexed_name(&base, idx, count),
)?;
if plan.validate {
summary.validated = Some(true);
}
for rec in &recs {
super::commit::record_part(
plan,
summary,
state,
rec,
super::commit::PartKind::Page {
page_index: pages as i64,
},
);
}
log::info!(
"export '{}': keyset page {} — {} rows",
plan.export_name,
pages,
rows
);
pages += 1;
if rows < kp.chunk_size {
break;
}
match sink.last_cursor_value.clone() {
Some(v) => last = Some(v),
None => anyhow::bail!(
"export '{}': keyset could not read the '{}' value from the last row of page {} \
(NULL or unsupported type) — cannot advance safely. The key must be NOT NULL and \
one of: integer, float, string, timestamp, date, uuid.",
plan.export_name,
kp.key_column,
pages - 1
),
}
}
log::info!(
"export '{}': keyset complete — {} page(s), {} rows",
plan.export_name,
pages,
summary.total_rows
);
Ok(())
}