use super::summary::RunSummary;
use crate::error::Result;
use crate::plan::ResolvedRunPlan;
use crate::state::StateStore;
pub(crate) enum Progression {
Incremental { last_value: String },
Chunked,
}
pub(crate) struct RunStore<'a> {
state: &'a StateStore,
plan: &'a ResolvedRunPlan,
summary: &'a RunSummary,
cursor: Option<String>,
progression: Option<Progression>,
}
impl<'a> RunStore<'a> {
pub fn finalize(
state: &'a StateStore,
plan: &'a ResolvedRunPlan,
summary: &'a RunSummary,
) -> Self {
Self {
state,
plan,
summary,
cursor: None,
progression: None,
}
}
pub fn with_cursor(mut self, value: String) -> Self {
self.cursor = Some(value);
self
}
pub fn with_progression(mut self, p: Progression) -> Self {
self.progression = Some(p);
self
}
pub fn commit(self) -> Result<()> {
if let Some(cursor_val) = self.cursor.as_deref() {
self.state.update(&self.plan.export_name, cursor_val)?;
crate::test_hook::maybe_panic_at("after_cursor_commit");
log::info!(
"export '{}': cursor updated to '{}'",
self.plan.export_name,
cursor_val
);
}
if let Some(p) = self.progression {
match p {
Progression::Incremental { last_value } => {
if let Err(e) = self.state.record_committed_incremental(
&self.plan.export_name,
&last_value,
&self.summary.run_id,
) {
log::warn!(
"export '{}': committed boundary update failed: {:#}",
self.plan.export_name,
e
);
}
}
Progression::Chunked => {
super::chunked::record_chunked_commit(
self.state,
&self.plan.export_name,
&self.summary.run_id,
);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CompressionType, DestinationConfig, DestinationType, FormatType, SourceConfig, SourceType,
};
use crate::plan::{ExtractionStrategy, ResolvedRunPlan};
use crate::tuning::SourceTuning;
fn test_plan(export_name: &str) -> ResolvedRunPlan {
ResolvedRunPlan {
export_name: export_name.into(),
base_query: "SELECT 1".into(),
strategy: ExtractionStrategy::Snapshot,
format: FormatType::Parquet,
compression: CompressionType::None,
compression_level: None,
max_file_size_bytes: None,
skip_empty: false,
meta_columns: Default::default(),
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("/tmp".into()),
..Default::default()
},
quality: None,
tuning: SourceTuning::from_config(None),
tuning_profile_label: "balanced".into(),
validate: false,
reconcile: false,
resume: false,
source: SourceConfig {
source_type: SourceType::Postgres,
url: Some("postgresql://nobody@127.0.0.1:9999/nonexistent".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
},
column_overrides: Default::default(),
verify: crate::config::VerifyMode::Size,
schema_drift_policy: Default::default(),
shape_drift_warn_factor: 0.0,
parquet: None,
}
}
fn test_summary(plan: &ResolvedRunPlan, run_id: &str) -> RunSummary {
let mut s = RunSummary::stub_for_testing(run_id, plan.export_name.clone());
s.batch_size = 10_000;
s.mode = "snapshot".into();
s.compression = "none".into();
s
}
#[test]
fn finalize_with_no_writes_is_a_noop() {
let state = StateStore::open_in_memory().unwrap();
let plan = test_plan("orders");
let summary = test_summary(&plan, "run-noop");
RunStore::finalize(&state, &plan, &summary)
.commit()
.unwrap();
}
#[test]
fn finalize_with_cursor_only_advances_state_cursor() {
let state = StateStore::open_in_memory().unwrap();
let plan = test_plan("orders");
let summary = test_summary(&plan, "run-1");
RunStore::finalize(&state, &plan, &summary)
.with_cursor("2026-05-30T12:00:00Z".into())
.commit()
.unwrap();
let cursor = state.get("orders").unwrap();
assert_eq!(
cursor.last_cursor_value.as_deref(),
Some("2026-05-30T12:00:00Z"),
"I3: state.update must persist the cursor value"
);
}
#[test]
fn finalize_with_progression_incremental_writes_progression_row() {
let state = StateStore::open_in_memory().unwrap();
let plan = test_plan("orders");
let summary = test_summary(&plan, "run-2");
RunStore::finalize(&state, &plan, &summary)
.with_progression(Progression::Incremental {
last_value: "42".into(),
})
.commit()
.unwrap();
let prog = state.get_progression("orders").unwrap();
let boundary = prog.committed.expect("committed boundary written");
assert_eq!(boundary.strategy, "incremental");
assert_eq!(boundary.cursor.as_deref(), Some("42"));
assert_eq!(boundary.run_id.as_deref(), Some("run-2"));
}
#[test]
fn finalize_with_progression_chunked_picks_highest_completed_chunk_index() {
let state = StateStore::open_in_memory().unwrap();
let plan = test_plan("orders");
let summary = test_summary(&plan, "run-3");
state.create_chunk_run("run-3", "orders", "h", 3).unwrap();
state
.insert_chunk_tasks("run-3", &[(1, 10), (11, 20), (21, 30)])
.unwrap();
state
.complete_chunk_task("run-3", 0, 10, Some("c0.parquet"))
.unwrap();
state
.complete_chunk_task("run-3", 2, 30, Some("c2.parquet"))
.unwrap();
RunStore::finalize(&state, &plan, &summary)
.with_progression(Progression::Chunked)
.commit()
.unwrap();
let prog = state.get_progression("orders").unwrap();
let boundary = prog.committed.expect("committed boundary written");
assert_eq!(boundary.strategy, "chunked");
assert_eq!(
boundary.chunk_index,
Some(2),
"must pick the highest *completed* chunk_index, not the highest claimed"
);
assert_eq!(boundary.run_id.as_deref(), Some("run-3"));
}
#[test]
fn finalize_with_cursor_and_progression_writes_both() {
let state = StateStore::open_in_memory().unwrap();
let plan = test_plan("orders");
let summary = test_summary(&plan, "run-4");
RunStore::finalize(&state, &plan, &summary)
.with_cursor("99".into())
.with_progression(Progression::Incremental {
last_value: "99".into(),
})
.commit()
.unwrap();
assert_eq!(
state.get("orders").unwrap().last_cursor_value.as_deref(),
Some("99")
);
let prog = state.get_progression("orders").unwrap();
let boundary = prog.committed.expect("committed boundary written");
assert_eq!(boundary.cursor.as_deref(), Some("99"));
}
#[test]
fn finalize_chunked_with_no_completed_tasks_is_a_silent_noop() {
let state = StateStore::open_in_memory().unwrap();
let plan = test_plan("orders");
let summary = test_summary(&plan, "run-empty");
state
.create_chunk_run("run-empty", "orders", "h", 3)
.unwrap();
state
.insert_chunk_tasks("run-empty", &[(1, 10), (11, 20)])
.unwrap();
RunStore::finalize(&state, &plan, &summary)
.with_progression(Progression::Chunked)
.commit()
.unwrap();
let prog = state.get_progression("orders").unwrap();
assert!(
prog.committed.is_none(),
"empty chunk_task → no committed boundary"
);
}
}