use std::collections::HashMap;
use std::path::Path;
use crate::config::{Config, ExportConfig, ExportMode};
use crate::error::Result;
use crate::tuning::{SourceTuning, TuningProfile, merge_tuning_config};
use super::contract::{ChunkedPlan, ExtractionStrategy, IncrementalCursorPlan, ResolvedRunPlan};
pub fn build_plan(
config: &Config,
export: &ExportConfig,
config_dir: &Path,
validate: bool,
reconcile: bool,
resume: bool,
params: Option<&HashMap<String, String>>,
) -> Result<ResolvedRunPlan> {
let base_query = export.resolve_query(config_dir, params)?;
let merged = merge_tuning_config(config.source.tuning.as_ref(), export.tuning.as_ref());
let fallback_profile = config
.source
.environment
.map(|e| e.default_profile())
.unwrap_or(TuningProfile::Balanced);
let tuning = SourceTuning::from_config_with_default_profile(merged.as_ref(), fallback_profile);
let profile_label = |p: TuningProfile| match p {
TuningProfile::Fast => "fast",
TuningProfile::Balanced => "balanced",
TuningProfile::Safe => "safe",
};
let env_label = |e: crate::config::SourceEnvironment| match e {
crate::config::SourceEnvironment::Local => "local",
crate::config::SourceEnvironment::Replica => "replica",
crate::config::SourceEnvironment::Production => "production",
};
let tuning_profile_label = match merged.as_ref().and_then(|t| t.profile) {
Some(p) => profile_label(p).to_string(),
None => match config.source.environment {
Some(env) => format!(
"{} (default for environment: {})",
profile_label(fallback_profile),
env_label(env)
),
None => "balanced (default)".to_string(),
},
};
let strategy = match export.mode {
ExportMode::Full => ExtractionStrategy::Snapshot,
ExportMode::Incremental => {
let primary_column = export.cursor_column.clone().ok_or_else(|| {
anyhow::anyhow!(
"export '{}': incremental mode requires 'cursor_column'",
export.name
)
})?;
let fallback_column = export.cursor_fallback_column.clone();
let mode = export.incremental_cursor_mode;
ExtractionStrategy::Incremental(IncrementalCursorPlan {
primary_column,
fallback_column,
mode,
})
}
ExportMode::Chunked => resolve_chunked_strategy(config, export, &tuning)?,
ExportMode::TimeWindow => {
let column = export.time_column.clone().ok_or_else(|| {
anyhow::anyhow!(
"export '{}': time_window mode requires 'time_column'",
export.name
)
})?;
let days_window = export.days_window.ok_or_else(|| {
anyhow::anyhow!(
"export '{}': time_window mode requires 'days_window'",
export.name
)
})?;
ExtractionStrategy::TimeWindow {
column,
column_type: export.time_column_type,
days_window,
}
}
};
let (compression, compression_level) = export.effective_compression();
Ok(ResolvedRunPlan {
export_name: export.name.clone(),
base_query,
strategy,
format: export.format,
compression,
compression_level,
max_file_size_bytes: export.max_file_size_bytes(),
skip_empty: export.skip_empty,
meta_columns: export.meta_columns.clone(),
destination: expand_destination_templates(export.destination.clone(), &export.name),
quality: export.quality.clone(),
tuning,
tuning_profile_label,
validate,
reconcile,
resume,
source: config.source.clone(),
column_overrides: parse_column_overrides(&export.columns, &export.name)?,
schema_drift_policy: export.on_schema_drift,
shape_drift_warn_factor: export.shape_drift_warn_factor.unwrap_or(2.0),
parquet: export.parquet.clone(),
})
}
fn resolve_chunked_strategy(
config: &Config,
export: &ExportConfig,
tuning: &SourceTuning,
) -> Result<ExtractionStrategy> {
if let Some(count) = export.chunk_count {
if count == 0 {
anyhow::bail!("export '{}': chunk_count must be >= 1 (got 0)", export.name);
}
if export.chunk_dense {
anyhow::bail!(
"export '{}': chunk_count and chunk_dense are mutually exclusive",
export.name
);
}
if export.chunk_by_days.is_some() {
anyhow::bail!(
"export '{}': chunk_count and chunk_by_days are mutually exclusive",
export.name
);
}
}
let max_attempts = export
.chunk_max_attempts
.unwrap_or_else(|| tuning.max_retries.saturating_add(1).max(1));
if export.chunk_column.is_some() && export.chunk_size_memory_mb.is_none() {
return Ok(ExtractionStrategy::Chunked(ChunkedPlan {
column: export.chunk_column.clone().unwrap(),
chunk_size: export.chunk_size,
chunk_count: export.chunk_count,
parallel: export.parallel,
dense: export.chunk_dense,
by_days: export.chunk_by_days,
checkpoint: export.chunk_checkpoint,
max_attempts,
}));
}
let Some(tbl) = export.table.as_ref() else {
if export.chunk_size_memory_mb.is_some() {
anyhow::bail!(
"export '{}': `chunk_size_memory_mb:` only applies with the `table:` shortcut — \
set `table:` (preferred) or remove `chunk_size_memory_mb:` and keep an explicit `chunk_size:`",
export.name
);
}
anyhow::bail!(
"export '{}': chunked mode requires 'chunk_column' \
(auto-resolve from PK is only supported with the `table:` shortcut)",
export.name
);
};
let url = config.source.resolve_url().map_err(|e| {
anyhow::anyhow!(
"export '{}': chunked mode needs the source URL for the introspection probe: {e}",
export.name
)
})?;
let introspection = match config.source.source_type {
crate::config::SourceType::Postgres => {
crate::source::postgres::introspect_pg_table_for_chunking(
&url,
config.source.tls.as_ref(),
tbl,
)
}
crate::config::SourceType::Mysql => {
crate::source::mysql::introspect_mysql_table_for_chunking(
&url,
config.source.tls.as_ref(),
tbl,
)
}
}
.map_err(|e| {
anyhow::anyhow!(
"export '{}': chunked-mode introspection probe failed: {e}. \
Set `chunk_column:` (and `chunk_size:`) explicitly or check connectivity.",
export.name
)
})?;
let column = if let Some(c) = export.chunk_column.clone() {
c
} else {
match introspection.single_int_pk.clone() {
Some(col) => {
log::warn!(
"export '{}': chunk_column not set — auto-resolved to '{}' \
from the single-integer primary key on {}. \
Set `chunk_column:` explicitly to pin the choice and silence this warning.",
export.name,
col,
tbl
);
col
}
None => {
anyhow::bail!(
"export '{}': chunked mode requires 'chunk_column'. \
Tried auto-resolving from PK on {} but found none, a composite PK, \
or a non-integer PK — set `chunk_column:` explicitly.",
export.name,
tbl
);
}
}
};
let chunk_size = if let Some(mb) = export.chunk_size_memory_mb {
let row_bytes = introspection.avg_row_bytes.filter(|b| *b > 0).unwrap_or_else(|| {
log::warn!(
"export '{}': chunk_size_memory_mb set but {} has no pg_class stats yet (run ANALYZE?) — \
defaulting to 512 B/row for sizing",
export.name,
tbl
);
512
});
let computed = (mb as i64 * 1024 * 1024 / row_bytes).max(1);
let clamped = computed.clamp(10_000, 5_000_000) as usize;
log::info!(
"export '{}': chunk_size_memory_mb={} MB ÷ {} B/row ≈ {} rows (clamped to {})",
export.name,
mb,
row_bytes,
computed,
clamped
);
clamped
} else {
export.chunk_size
};
if introspection.row_estimate > 0 && (introspection.row_estimate as usize) <= chunk_size {
log::info!(
"export '{}': {} has ~{} rows ≤ chunk_size {}; downgrading chunked → snapshot",
export.name,
tbl,
introspection.row_estimate,
chunk_size,
);
return Ok(ExtractionStrategy::Snapshot);
}
Ok(ExtractionStrategy::Chunked(ChunkedPlan {
column,
chunk_size,
chunk_count: export.chunk_count,
parallel: export.parallel,
dense: export.chunk_dense,
by_days: export.chunk_by_days,
checkpoint: export.chunk_checkpoint,
max_attempts,
}))
}
pub fn parse_column_overrides_pub(
raw: &std::collections::HashMap<String, String>,
export_name: &str,
) -> Result<crate::types::ColumnOverrides> {
parse_column_overrides(raw, export_name)
}
fn parse_column_overrides(
raw: &std::collections::HashMap<String, String>,
export_name: &str,
) -> Result<crate::types::ColumnOverrides> {
raw.iter()
.map(|(col, type_str)| {
crate::types::parse_type_str(type_str)
.map(|t| (col.clone(), t))
.map_err(|e| {
anyhow::anyhow!(
"export '{}': column override for '{}': {}",
export_name,
col,
e
)
})
})
.collect()
}
pub(crate) fn expand_destination_templates(
dest: crate::config::DestinationConfig,
export_name: &str,
) -> crate::config::DestinationConfig {
let ctx = crate::destination::placeholder::PlaceholderContext::for_today(export_name);
crate::destination::placeholder::expand_destination(dest, &ctx)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CompressionType, DestinationConfig, DestinationType, FormatType, IncrementalCursorMode,
MetaColumns, SourceConfig, SourceType, TimeColumnType,
};
fn minimal_source_config() -> SourceConfig {
SourceConfig {
source_type: SourceType::Postgres,
url: Some("postgresql://localhost/test".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
}
}
fn minimal_config() -> Config {
Config {
source: minimal_source_config(),
exports: vec![],
notifications: None,
parallel_exports: false,
parallel_export_processes: false,
}
}
fn minimal_export() -> ExportConfig {
ExportConfig {
name: "test_export".into(),
query: Some("SELECT 1".into()),
query_file: None,
table: None,
mode: ExportMode::Full,
cursor_column: None,
cursor_fallback_column: None,
incremental_cursor_mode: IncrementalCursorMode::SingleColumn,
chunk_column: None,
chunk_size: 100_000,
chunk_size_memory_mb: None,
chunk_count: None,
chunk_dense: false,
chunk_by_days: None,
parallel: 1,
time_column: None,
time_column_type: TimeColumnType::Timestamp,
days_window: None,
format: FormatType::Parquet,
compression: CompressionType::Zstd,
compression_level: None,
compression_profile: None,
skip_empty: false,
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("./out".into()),
..Default::default()
},
meta_columns: MetaColumns::default(),
quality: None,
max_file_size: None,
chunk_checkpoint: false,
chunk_max_attempts: None,
tuning: None,
source_group: None,
reconcile_required: false,
columns: Default::default(),
on_schema_drift: Default::default(),
shape_drift_warn_factor: None,
parquet: None,
}
}
#[test]
fn snapshot_plan_from_full_mode() {
let export = minimal_export();
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
assert!(matches!(plan.strategy, ExtractionStrategy::Snapshot));
assert_eq!(plan.strategy.mode_label(), "full");
assert_eq!(plan.export_name, "test_export");
assert_eq!(plan.base_query, "SELECT 1");
assert!(!plan.validate);
assert!(!plan.reconcile);
assert!(!plan.resume);
}
#[test]
fn incremental_plan_resolves_cursor_column() {
let mut export = minimal_export();
export.mode = ExportMode::Incremental;
export.cursor_column = Some("updated_at".into());
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Incremental(p) => {
assert_eq!(p.primary_column, "updated_at");
assert_eq!(p.mode, IncrementalCursorMode::SingleColumn);
}
_ => panic!("expected Incremental"),
}
assert_eq!(plan.strategy.mode_label(), "incremental");
}
#[test]
fn chunked_plan_resolves_all_fields() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = Some("id".into());
export.chunk_size = 50_000;
export.parallel = 4;
export.chunk_checkpoint = true;
export.chunk_max_attempts = Some(5);
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
true,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => {
assert_eq!(cp.column, "id");
assert_eq!(cp.chunk_size, 50_000);
assert_eq!(cp.parallel, 4);
assert!(cp.checkpoint);
assert_eq!(cp.max_attempts, 5);
}
_ => panic!("expected Chunked"),
}
assert_eq!(plan.strategy.mode_label(), "chunked");
assert!(plan.validate);
}
#[test]
fn chunked_max_attempts_defaults_from_tuning() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = Some("id".into());
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => assert_eq!(cp.max_attempts, 4),
_ => panic!("expected Chunked"),
}
}
#[test]
fn chunked_without_column_or_table_returns_explicit_error() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = None;
export.table = None; let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("chunked mode requires 'chunk_column'")
&& msg.contains("`table:` shortcut"),
"expected actionable error about table: shortcut; got: {msg}"
);
}
#[test]
fn chunked_explicit_column_skips_auto_resolve_and_no_db_call() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = Some("explicit_pk".into());
export.table = Some("public.something".into());
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.expect("explicit chunk_column must short-circuit auto-resolve");
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => assert_eq!(cp.column, "explicit_pk"),
_ => panic!("expected Chunked"),
}
}
#[test]
fn chunked_size_memory_mb_without_table_errors_with_hint() {
let mut export = minimal_export();
export.mode = ExportMode::Chunked;
export.chunk_column = Some("id".into());
export.chunk_size_memory_mb = Some(256);
export.table = None;
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("chunk_size_memory_mb") && msg.contains("`table:` shortcut"),
"expected actionable error mentioning chunk_size_memory_mb + table:; got: {msg}"
);
}
#[test]
fn time_window_plan_resolves_column_and_days() {
let mut export = minimal_export();
export.mode = ExportMode::TimeWindow;
export.time_column = Some("created_at".into());
export.time_column_type = TimeColumnType::Unix;
export.days_window = Some(30);
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::TimeWindow {
column,
column_type,
days_window,
} => {
assert_eq!(column, "created_at");
assert_eq!(*column_type, TimeColumnType::Unix);
assert_eq!(*days_window, 30);
}
_ => panic!("expected TimeWindow"),
}
assert_eq!(plan.strategy.mode_label(), "timewindow");
}
#[test]
fn plan_carries_cli_flags() {
let export = minimal_export();
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
true,
true,
true,
None,
)
.unwrap();
assert!(plan.validate);
assert!(plan.reconcile);
assert!(plan.resume);
}
#[test]
fn plan_resolves_tuning_profile_label() {
let export = minimal_export();
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
assert_eq!(plan.tuning_profile_label, "balanced (default)");
}
#[test]
fn expand_destination_templates_substitutes_all_placeholders() {
let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
let dest = DestinationConfig {
destination_type: DestinationType::Local,
prefix: Some("exports/{date}/{export}/".into()),
path: Some("/data/{table}/{date}".into()),
..Default::default()
};
let expanded = expand_destination_templates(dest, "orders");
assert_eq!(
expanded.path.as_deref(),
Some(format!("/data/orders/{today}").as_str())
);
assert_eq!(
expanded.prefix.as_deref(),
Some(format!("exports/{today}/orders/").as_str())
);
}
fn chunked_export() -> ExportConfig {
let mut e = minimal_export();
e.mode = ExportMode::Chunked;
e.chunk_column = Some("id".into());
e
}
#[test]
fn chunk_count_zero_is_rejected() {
let mut export = chunked_export();
export.chunk_count = Some(0);
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
assert!(
err.to_string().contains("chunk_count") && err.to_string().contains("1"),
"expected 'chunk_count must be >= 1', got: {err}"
);
}
#[test]
fn chunk_count_with_chunk_dense_is_rejected() {
let mut export = chunked_export();
export.chunk_count = Some(10);
export.chunk_dense = true;
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
assert!(
err.to_string().contains("mutually exclusive"),
"expected 'mutually exclusive', got: {err}"
);
}
#[test]
fn chunk_count_with_chunk_by_days_is_rejected() {
let mut export = chunked_export();
export.chunk_count = Some(10);
export.chunk_by_days = Some(7);
let err = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap_err();
assert!(
err.to_string().contains("mutually exclusive"),
"expected 'mutually exclusive', got: {err}"
);
}
#[test]
fn chunk_count_valid_threads_through_to_plan() {
let mut export = chunked_export();
export.chunk_count = Some(5);
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => assert_eq!(cp.chunk_count, Some(5)),
_ => panic!("expected Chunked"),
}
}
#[test]
fn chunk_count_none_is_accepted_with_dense() {
let mut export = chunked_export();
export.chunk_count = None;
export.chunk_dense = true;
let plan = build_plan(
&minimal_config(),
&export,
Path::new("."),
false,
false,
false,
None,
)
.unwrap();
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => {
assert!(cp.dense);
assert!(cp.chunk_count.is_none());
}
_ => panic!("expected Chunked"),
}
}
#[test]
fn expand_destination_templates_no_placeholders_unchanged() {
let dest = DestinationConfig {
destination_type: DestinationType::Local,
path: Some("./out".into()),
..Default::default()
};
let expanded = expand_destination_templates(dest, "orders");
assert_eq!(expanded.path.as_deref(), Some("./out"));
assert!(expanded.prefix.is_none());
}
#[test]
fn expand_destination_templates_is_idempotent_on_already_expanded_strings() {
let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
let once = expand_destination_templates(
DestinationConfig {
destination_type: DestinationType::Local,
prefix: Some("runs/{date}/{export}/".into()),
..Default::default()
},
"orders",
);
let twice = expand_destination_templates(once.clone(), "orders");
assert_eq!(once.prefix, twice.prefix);
assert_eq!(
once.prefix.as_deref(),
Some(format!("runs/{today}/orders/").as_str())
);
}
}