use std::collections::HashMap;
use std::path::Path;
use crate::config::{Config, ExportConfig, ExportMode};
use crate::error::Result;
use crate::tuning::{SourceTuning, TuningProfile, merge_tuning_config};
use super::contract::{
ChunkedPlan, ExtractionStrategy, IncrementalCursorPlan, KeysetPlan, ResolvedRunPlan,
};
pub fn build_plan(
config: &Config,
export: &ExportConfig,
config_dir: &Path,
validate: bool,
reconcile: bool,
resume: bool,
params: Option<&HashMap<String, String>>,
) -> Result<ResolvedRunPlan> {
let base_query = export.resolve_query(config_dir, params)?;
let merged = merge_tuning_config(config.source.tuning.as_ref(), export.tuning.as_ref());
let fallback_profile = config
.source
.environment
.map(|e| e.default_profile())
.unwrap_or(TuningProfile::Balanced);
let tuning = SourceTuning::from_config_with_default_profile(merged.as_ref(), fallback_profile);
let profile_label = |p: TuningProfile| match p {
TuningProfile::Fast => "fast",
TuningProfile::Balanced => "balanced",
TuningProfile::Safe => "safe",
};
let env_label = |e: crate::config::SourceEnvironment| match e {
crate::config::SourceEnvironment::Local => "local",
crate::config::SourceEnvironment::Replica => "replica",
crate::config::SourceEnvironment::Production => "production",
};
let tuning_profile_label = match merged.as_ref().and_then(|t| t.profile) {
Some(p) => profile_label(p).to_string(),
None => match config.source.environment {
Some(env) => format!(
"{} (default for environment: {})",
profile_label(fallback_profile),
env_label(env)
),
None => "balanced (default)".to_string(),
},
};
let strategy = match export.mode {
ExportMode::Full => ExtractionStrategy::Snapshot,
ExportMode::Incremental => {
let primary_column = export.cursor_column.clone().ok_or_else(|| {
anyhow::anyhow!(
"export '{}': incremental mode requires 'cursor_column'",
export.name
)
})?;
let fallback_column = export.cursor_fallback_column.clone();
let mode = export.incremental_cursor_mode;
ExtractionStrategy::Incremental(IncrementalCursorPlan {
primary_column,
fallback_column,
mode,
})
}
ExportMode::Chunked => resolve_chunked_strategy(config, export, &tuning)?,
ExportMode::TimeWindow => {
let column = export.time_column.clone().ok_or_else(|| {
anyhow::anyhow!(
"export '{}': time_window mode requires 'time_column'",
export.name
)
})?;
let days_window = export.days_window.ok_or_else(|| {
anyhow::anyhow!(
"export '{}': time_window mode requires 'days_window'",
export.name
)
})?;
ExtractionStrategy::TimeWindow {
column,
column_type: export.time_column_type,
days_window,
}
}
};
let (compression, compression_level) = export.effective_compression();
Ok(ResolvedRunPlan {
export_name: export.name.clone(),
base_query,
strategy,
format: export.format,
compression,
compression_level,
max_file_size_bytes: export.max_file_size_bytes(),
skip_empty: export.skip_empty,
meta_columns: export.meta_columns.clone(),
destination: expand_destination_templates(export.destination.clone(), &export.name),
quality: export.quality.clone(),
tuning,
tuning_profile_label,
validate: validate || reconcile,
reconcile,
resume,
verify: export.verify,
source: config.source.clone(),
column_overrides: parse_column_overrides(&export.columns, &export.name)?,
schema_drift_policy: export.on_schema_drift,
shape_drift_warn_factor: export.shape_drift_warn_factor.unwrap_or(2.0),
parquet: export.parquet.clone(),
})
}
fn resolve_chunked_strategy(
config: &Config,
export: &ExportConfig,
tuning: &SourceTuning,
) -> Result<ExtractionStrategy> {
if let Some(count) = export.chunk_count {
if count == 0 {
anyhow::bail!("export '{}': chunk_count must be >= 1 (got 0)", export.name);
}
if export.chunk_dense {
anyhow::bail!(
"export '{}': chunk_count and chunk_dense are mutually exclusive",
export.name
);
}
if export.chunk_by_days.is_some() {
anyhow::bail!(
"export '{}': chunk_count and chunk_by_days are mutually exclusive",
export.name
);
}
}
if export.chunk_by_key.is_some() {
for (conflict, name) in [
(export.chunk_column.is_some(), "chunk_column"),
(export.chunk_dense, "chunk_dense"),
(export.chunk_by_days.is_some(), "chunk_by_days"),
(export.chunk_count.is_some(), "chunk_count"),
] {
if conflict {
anyhow::bail!(
"export '{}': chunk_by_key and {} are mutually exclusive",
export.name,
name
);
}
}
}
let max_attempts = export
.chunk_max_attempts
.unwrap_or_else(|| tuning.max_retries.saturating_add(1).max(1));
if export.chunk_column.is_some() && export.chunk_size_memory_mb.is_none() {
return Ok(ExtractionStrategy::Chunked(ChunkedPlan {
column: export.chunk_column.clone().unwrap(),
chunk_size: export.chunk_size,
chunk_count: export.chunk_count,
parallel: export.parallel,
dense: export.chunk_dense,
by_days: export.chunk_by_days,
checkpoint: export.chunk_checkpoint,
max_attempts,
}));
}
let Some(tbl) = export.table.as_ref() else {
if export.chunk_by_key.is_some() {
anyhow::bail!(
"export '{}': `chunk_by_key:` needs the `table:` shortcut so the planner can \
verify the key is backed by a unique index (an unindexed ORDER BY key would \
filesort the whole table). Set `table:` instead of `query:`.",
export.name
);
}
if export.chunk_size_memory_mb.is_some() {
anyhow::bail!(
"export '{}': `chunk_size_memory_mb:` only applies with the `table:` shortcut — \
set `table:` (preferred) or remove `chunk_size_memory_mb:` and keep an explicit `chunk_size:`",
export.name
);
}
anyhow::bail!(
"export '{}': chunked mode requires 'chunk_column' \
(auto-resolve from PK is only supported with the `table:` shortcut)",
export.name
);
};
let url = config.source.resolve_url().map_err(|e| {
anyhow::anyhow!(
"export '{}': chunked mode needs the source URL for the introspection probe: {e}",
export.name
)
})?;
let introspection = match config.source.source_type {
crate::config::SourceType::Postgres => {
crate::source::postgres::introspect_pg_table_for_chunking(
&url,
config.source.tls.as_ref(),
tbl,
)
}
crate::config::SourceType::Mysql => {
crate::source::mysql::introspect_mysql_table_for_chunking(
&url,
config.source.tls.as_ref(),
tbl,
)
}
crate::config::SourceType::Mssql => {
crate::source::mssql::introspect_mssql_table_for_chunking(
&url,
config.source.tls.as_ref(),
tbl,
)
}
}
.map_err(|e| {
anyhow::anyhow!(
"export '{}': chunked-mode introspection probe failed: {e}. \
Set `chunk_column:` (and `chunk_size:`) explicitly or check connectivity.",
export.name
)
})?;
let chunk_size = if let Some(mb) = export.chunk_size_memory_mb {
let row_bytes = introspection.avg_row_bytes.filter(|b| *b > 0).unwrap_or_else(|| {
log::warn!(
"export '{}': chunk_size_memory_mb set but {} has no pg_class stats yet (run ANALYZE?) — \
defaulting to 512 B/row for sizing",
export.name,
tbl
);
512
});
let computed = (mb as i64 * 1024 * 1024 / row_bytes).max(1);
let clamped = computed.clamp(10_000, 5_000_000) as usize;
log::info!(
"export '{}': chunk_size_memory_mb={} MB ÷ {} B/row ≈ {} rows (clamped to {})",
export.name,
mb,
row_bytes,
computed,
clamped
);
clamped
} else {
export.chunk_size
};
let explicit_chunk_shape = export.chunk_count.is_some()
|| export.chunk_by_days.is_some()
|| export.chunk_checkpoint
|| export.chunk_by_key.is_some();
if !explicit_chunk_shape
&& introspection.row_estimate > 0
&& (introspection.row_estimate as usize) <= chunk_size
{
log::info!(
"export '{}': {} has ~{} rows ≤ chunk_size {}; downgrading chunked → snapshot",
export.name,
tbl,
introspection.row_estimate,
chunk_size,
);
return Ok(ExtractionStrategy::Snapshot);
}
if let Some(key) = export.chunk_by_key.as_deref() {
if !introspection.is_usable_keyset_key(key) {
anyhow::bail!(
"export '{}': chunk_by_key '{}' is not a usable keyset key on {} — it must be a \
single-column, NOT NULL, UNIQUE or PRIMARY key. Without a unique index, \
`ORDER BY {} LIMIT n` would full-scan + filesort the table. Add a unique index \
on it, pick another key, or use `mode: full` to accept one long snapshot query.",
export.name,
key,
tbl,
key
);
}
log::info!(
"export '{}': keyset (seek) pagination on '{}' (chunk_size={})",
export.name,
key,
chunk_size
);
return Ok(ExtractionStrategy::Keyset(KeysetPlan {
key_column: key.to_string(),
chunk_size,
}));
}
let column = if let Some(c) = export.chunk_column.clone() {
c
} else {
match introspection.single_int_pk.clone() {
Some(col) => {
log::warn!(
"export '{}': chunk_column not set — auto-resolved to '{}' \
from the single-integer primary key on {}. \
Set `chunk_column:` explicitly to pin the choice and silence this warning.",
export.name,
col,
tbl
);
col
}
None => {
if config.source.source_type == crate::config::SourceType::Mysql
&& let Some(key) = introspection.auto_keyset_key()
{
log::warn!(
"export '{}': {} has no single-integer PK — auto-selected keyset \
(seek) pagination on unique key '{}'. Set `chunk_by_key:` explicitly \
to pin the choice and silence this warning.",
export.name,
tbl,
key
);
return Ok(ExtractionStrategy::Keyset(KeysetPlan {
key_column: key.to_string(),
chunk_size,
}));
}
anyhow::bail!(
"export '{}': chunked mode found no safe shape on {} — no single-integer PK \
to range-chunk and no single-column UNIQUE/PRIMARY key to keyset-page. \
Set `chunk_column:` (integer) or `chunk_by_key:` (unique key) explicitly, \
add a unique index, or use `mode: full` to accept one long snapshot query.",
export.name,
tbl
);
}
}
};
Ok(ExtractionStrategy::Chunked(ChunkedPlan {
column,
chunk_size,
chunk_count: export.chunk_count,
parallel: export.parallel,
dense: export.chunk_dense,
by_days: export.chunk_by_days,
checkpoint: export.chunk_checkpoint,
max_attempts,
}))
}
pub fn parse_column_overrides_pub(
raw: &std::collections::HashMap<String, String>,
export_name: &str,
) -> Result<crate::types::ColumnOverrides> {
parse_column_overrides(raw, export_name)
}
fn parse_column_overrides(
raw: &std::collections::HashMap<String, String>,
export_name: &str,
) -> Result<crate::types::ColumnOverrides> {
raw.iter()
.map(|(col, type_str)| {
crate::types::parse_type_str(type_str)
.map(|t| (col.clone(), t))
.map_err(|e| {
anyhow::anyhow!(
"export '{}': column override for '{}': {}",
export_name,
col,
e
)
})
})
.collect()
}
pub(crate) fn expand_destination_templates(
dest: crate::config::DestinationConfig,
export_name: &str,
) -> crate::config::DestinationConfig {
let ctx = crate::destination::placeholder::PlaceholderContext::for_today(export_name);
crate::destination::placeholder::expand_destination(dest, &ctx)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CompressionType, DestinationConfig, DestinationType, IncrementalCursorMode, SourceConfig,
SourceType, TimeColumnType,
};
fn minimal_source_config() -> SourceConfig {
SourceConfig {
source_type: SourceType::Postgres,
url: Some("postgresql://localhost/test".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
}
}
fn minimal_config() -> Config {
Config {
source: minimal_source_config(),
exports: vec![],
notifications: None,
parallel_exports: false,
parallel_export_processes: false,
}
}
fn minimal_export() -> ExportConfig {
ExportConfig {
compression: CompressionType::Zstd,
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("./out".into()),
..Default::default()
},
..crate::config::sample_export("test_export")
}
}
#[test]
fn snapshot_plan_from_full_mode() {
let export = minimal_export();
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
assert!(matches!(plan.strategy, ExtractionStrategy::Snapshot));
assert_eq!(plan.strategy.mode_label(), "full");
assert_eq!(plan.export_name, "test_export");
assert_eq!(plan.base_query, "SELECT 1");
assert!(!plan.validate);
assert!(!plan.reconcile);
assert!(!plan.resume);
}
#[test]
fn incremental_plan_resolves_cursor_column() {
let mut export = minimal_export();
export.mode = ExportMode::Incremental;
export.cursor_column = Some("updated_at".into());
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Incremental(p) => {
assert_eq!(p.primary_column, "updated_at");
assert_eq!(p.mode, IncrementalCursorMode::SingleColumn);
}
_ => panic!("expected Incremental"),
}
assert_eq!(plan.strategy.mode_label(), "incremental");
}
#[test]
fn chunked_plan_resolves_all_fields() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = Some("id".into());
export.chunk_size = 50_000;
export.parallel = 4;
export.chunk_checkpoint = true;
export.chunk_max_attempts = Some(5);
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
true,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => {
assert_eq!(cp.column, "id");
assert_eq!(cp.chunk_size, 50_000);
assert_eq!(cp.parallel, 4);
assert!(cp.checkpoint);
assert_eq!(cp.max_attempts, 5);
}
_ => panic!("expected Chunked"),
}
assert_eq!(plan.strategy.mode_label(), "chunked");
assert!(plan.validate);
}
#[test]
fn chunked_max_attempts_defaults_from_tuning() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = Some("id".into());
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => assert_eq!(cp.max_attempts, 4),
_ => panic!("expected Chunked"),
}
}
#[test]
fn chunk_by_key_conflicts_with_chunk_column() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_by_key = Some("uuid".into());
export.chunk_column = Some("id".into());
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
let msg = format!("{:#}", err);
assert!(msg.contains("mutually exclusive"), "got: {msg}");
}
#[test]
fn chunk_by_key_without_table_requires_table_shortcut() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_by_key = Some("uuid".into());
export.table = None;
export.chunk_column = None;
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
let msg = format!("{:#}", err);
assert!(
msg.contains("table:") && msg.contains("chunk_by_key"),
"should require the table shortcut to verify the index, got: {msg}"
);
}
#[test]
fn chunked_without_column_or_table_returns_explicit_error() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = None;
export.table = None; let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("chunked mode requires 'chunk_column'")
&& msg.contains("`table:` shortcut"),
"expected actionable error about table: shortcut; got: {msg}"
);
}
#[test]
fn chunked_explicit_column_skips_auto_resolve_and_no_db_call() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = Some("explicit_pk".into());
export.table = Some("public.something".into());
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.expect("explicit chunk_column must short-circuit auto-resolve");
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => assert_eq!(cp.column, "explicit_pk"),
_ => panic!("expected Chunked"),
}
}
#[test]
fn chunked_size_memory_mb_without_table_errors_with_hint() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = Some("id".into());
export.chunk_size_memory_mb = Some(256);
export.table = None;
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("chunk_size_memory_mb") && msg.contains("`table:` shortcut"),
"expected actionable error mentioning chunk_size_memory_mb + table:; got: {msg}"
);
}
#[test]
fn time_window_plan_resolves_column_and_days() {
let mut export = minimal_export();
export.mode = ExportMode::TimeWindow;
export.time_column = Some("created_at".into());
export.time_column_type = TimeColumnType::Unix;
export.days_window = Some(30);
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::TimeWindow {
column,
column_type,
days_window,
} => {
assert_eq!(column, "created_at");
assert_eq!(*column_type, TimeColumnType::Unix);
assert_eq!(*days_window, 30);
}
_ => panic!("expected TimeWindow"),
}
assert_eq!(plan.strategy.mode_label(), "timewindow");
}
#[test]
fn plan_carries_cli_flags() {
let export = minimal_export();
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
true,
true,
true,
None,
)
.unwrap();
assert!(plan.validate);
assert!(plan.reconcile);
assert!(plan.resume);
}
#[test]
fn plan_resolves_tuning_profile_label() {
let export = minimal_export();
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
assert_eq!(plan.tuning_profile_label, "balanced (default)");
}
#[test]
fn expand_destination_templates_substitutes_all_placeholders() {
let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
let dest = DestinationConfig {
destination_type: DestinationType::Local,
prefix: Some("exports/{date}/{export}/".into()),
path: Some("/data/{table}/{date}".into()),
..Default::default()
};
let expanded = expand_destination_templates(dest, "orders");
assert_eq!(
expanded.path.as_deref(),
Some(format!("/data/orders/{today}").as_str())
);
assert_eq!(
expanded.prefix.as_deref(),
Some(format!("exports/{today}/orders/").as_str())
);
}
fn chunked_export() -> ExportConfig {
let mut e = minimal_export();
e.mode = ExportMode::Chunked;
e.chunk_column = Some("id".into());
e
}
#[test]
fn chunk_count_zero_is_rejected() {
let mut export = chunked_export();
export.chunk_count = Some(0);
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
assert!(
err.to_string().contains("chunk_count") && err.to_string().contains("1"),
"expected 'chunk_count must be >= 1', got: {err}"
);
}
#[test]
fn chunk_count_with_chunk_dense_is_rejected() {
let mut export = chunked_export();
export.chunk_count = Some(10);
export.chunk_dense = true;
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
assert!(
err.to_string().contains("mutually exclusive"),
"expected 'mutually exclusive', got: {err}"
);
}
#[test]
fn chunk_count_with_chunk_by_days_is_rejected() {
let mut export = chunked_export();
export.chunk_count = Some(10);
export.chunk_by_days = Some(7);
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
assert!(
err.to_string().contains("mutually exclusive"),
"expected 'mutually exclusive', got: {err}"
);
}
#[test]
fn chunk_count_valid_threads_through_to_plan() {
let mut export = chunked_export();
export.chunk_count = Some(5);
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => assert_eq!(cp.chunk_count, Some(5)),
_ => panic!("expected Chunked"),
}
}
#[test]
fn chunk_count_none_is_accepted_with_dense() {
let mut export = chunked_export();
export.chunk_count = None;
export.chunk_dense = true;
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => {
assert!(cp.dense);
assert!(cp.chunk_count.is_none());
}
_ => panic!("expected Chunked"),
}
}
#[test]
fn expand_destination_templates_no_placeholders_unchanged() {
let dest = DestinationConfig {
destination_type: DestinationType::Local,
path: Some("./out".into()),
..Default::default()
};
let expanded = expand_destination_templates(dest, "orders");
assert_eq!(expanded.path.as_deref(), Some("./out"));
assert!(expanded.prefix.is_none());
}
#[test]
fn expand_destination_templates_is_idempotent_on_already_expanded_strings() {
let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
let once = expand_destination_templates(
DestinationConfig {
destination_type: DestinationType::Local,
prefix: Some("runs/{date}/{export}/".into()),
..Default::default()
},
"orders",
);
let twice = expand_destination_templates(once.clone(), "orders");
assert_eq!(once.prefix, twice.prefix);
assert_eq!(
once.prefix.as_deref(),
Some(format!("runs/{today}/orders/").as_str())
);
}
}