use super::HealthVerdict;
use crate::config::{ExportConfig, ExportMode};
pub(crate) fn derive_strategy(export: &ExportConfig) -> String {
match export.mode {
ExportMode::Full => {
if export.parallel > 1 {
format!("full-parallel({})", export.parallel)
} else {
"full-scan".to_string()
}
}
ExportMode::Incremental => {
let col = export.cursor_column.as_deref().unwrap_or("?");
format!("incremental({})", col)
}
ExportMode::Chunked => {
let col = export.chunk_column.as_deref().unwrap_or("?");
if let Some(days) = export.chunk_by_days {
if export.parallel > 1 {
format!(
"date-chunked-parallel({}, {}d, p={})",
col, days, export.parallel
)
} else {
format!("date-chunked({}, {}d)", col, days)
}
} else if export.parallel > 1 {
format!(
"chunked-parallel({}, size={}, p={})",
col, export.chunk_size, export.parallel
)
} else {
format!("chunked({}, size={})", col, export.chunk_size)
}
}
ExportMode::TimeWindow => {
let col = export.time_column.as_deref().unwrap_or("?");
let days = export.days_window.unwrap_or(0);
format!("time-window({}, {}d)", col, days)
}
}
}
pub(crate) fn recommend_profile(
row_estimate: Option<i64>,
uses_index: bool,
export: &ExportConfig,
) -> &'static str {
let rows = row_estimate.unwrap_or(0);
match (uses_index, rows) {
(true, r) if r <= 1_000_000 => "fast",
(true, r) if r <= 10_000_000 => "balanced",
(true, _) => "safe",
(false, r) if r <= 100_000 => {
if export.parallel > 1 {
"safe"
} else {
"balanced"
}
}
(false, r) if r <= 1_000_000 => "balanced",
(false, _) => "safe",
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct ChunkSparsityInfo {
pub is_sparse: bool,
pub density: f64,
pub range_span: i64,
pub logical_windows: i64,
pub row_count: i64,
}
pub(crate) fn chunk_sparsity_from_counts(
row_count: i64,
min_i: i64,
max_i: i64,
chunk_size: usize,
) -> ChunkSparsityInfo {
let range_span = (max_i - min_i).max(1);
let density = row_count as f64 / range_span as f64;
let logical_windows = if chunk_size == 0 {
0
} else {
(range_span + chunk_size as i64 - 1) / chunk_size as i64
};
let is_sparse = row_count > 0 && density < 0.1 && logical_windows > 10;
ChunkSparsityInfo {
is_sparse,
density,
range_span,
logical_windows,
row_count,
}
}
pub(crate) fn check_sparse_range(
export: &ExportConfig,
row_estimate: Option<i64>,
cursor_min: Option<&str>,
cursor_max: Option<&str>,
) -> Option<String> {
if export.mode != ExportMode::Chunked {
return None;
}
if export.chunk_dense {
return None;
}
if export.chunk_by_days.is_some() {
return None;
}
let rows = row_estimate.unwrap_or(0);
if rows == 0 {
return None;
}
let (min_val, max_val) = match (cursor_min, cursor_max) {
(Some(a), Some(b)) => (a, b),
_ => return None,
};
let min_i: i64 = min_val.parse().ok()?;
let max_i: i64 = max_val.parse().ok()?;
let info = chunk_sparsity_from_counts(rows, min_i, max_i, export.chunk_size);
if !info.is_sparse {
return None;
}
let empty_pct = ((1.0 - info.density).clamp(0.0, 1.0) * 100.0) as u32;
Some(format!(
"Sparse key range: ~{}% of chunk windows will be empty (range {}..{}, ~{} rows). \
Consider chunking on a dense surrogate (ROW_NUMBER) or switching to incremental mode.",
empty_pct, min_val, max_val, rows
))
}
pub(crate) fn check_dense_surrogate_cost(export: &ExportConfig) -> Option<String> {
let query = export.query.as_deref().unwrap_or("");
let q_upper = query.to_uppercase();
if export.mode == ExportMode::Chunked && (q_upper.contains("ROW_NUMBER") || export.chunk_dense)
{
Some(
"Dense surrogate (ROW_NUMBER) requires a global sort -- this adds CPU and I/O cost \
proportional to the full result set. For very large or hot tables, consider \
incremental mode on an indexed cursor column, or a precomputed dense key."
.to_string(),
)
} else {
None
}
}
const CONNECTION_HEADROOM: u32 = 3;
pub(crate) fn check_connection_limit(
parallel: usize,
db_max_connections: Option<u32>,
) -> Option<String> {
if parallel <= 1 {
return None;
}
match db_max_connections {
None => Some(
"Could not fetch DB max_connections — connection limit check skipped. \
Ask your DBA to verify your user has access to DB server variables, \
then verify your parallel setting manually."
.to_string(),
),
Some(max_conn) if parallel as u32 >= max_conn => Some(format!(
"parallel={parallel} meets or exceeds DB max_connections={max_conn} — \
workers will compete for connections and some may fail. \
Reduce parallel to at most {} (leave headroom for other connections).",
max_conn.saturating_sub(CONNECTION_HEADROOM).max(1),
)),
_ => None,
}
}
pub(crate) fn check_parallel_memory_risk(
export: &ExportConfig,
row_estimate: Option<i64>,
) -> Option<String> {
if export.parallel <= 1 {
return None;
}
let rows = row_estimate.unwrap_or(0);
if rows > 5_000_000 {
Some(format!(
"Parallel={} on ~{}M rows: each worker buffers batch_size rows in memory. \
With wide rows this can cause high RSS. Monitor with memory_threshold_mb \
or reduce parallel/batch_size.",
export.parallel,
rows / 1_000_000,
))
} else {
None
}
}
pub(crate) fn recommend_parallelism(
export: &ExportConfig,
row_estimate: Option<i64>,
uses_index: bool,
) -> (u32, &'static str) {
if export.mode != ExportMode::Chunked {
return (1, "only chunked mode benefits from parallelism");
}
let rows = row_estimate.unwrap_or(0);
if rows < 50_000 {
return (1, "dataset too small to benefit from parallelism");
}
if !uses_index && rows > 5_000_000 {
return (1, "no index — parallel scans would multiply source load");
}
if !uses_index {
return (
2,
"no index — conservative parallelism to limit source impact",
);
}
match rows {
r if r < 500_000 => (2, "moderate dataset — 2 workers sufficient"),
r if r < 5_000_000 => (4, "large dataset with index support"),
_ => (
4,
"very large dataset — cap at 4 to control memory; increase with memory_threshold_mb monitoring",
),
}
}
pub(super) fn collect_warnings(
export: &ExportConfig,
row_estimate: Option<i64>,
chunk_min: Option<&str>,
chunk_max: Option<&str>,
db_max_connections: Option<u32>,
) -> Vec<String> {
let mut warnings = Vec::new();
if let Some(w) = check_connection_limit(export.parallel, db_max_connections) {
warnings.push(w);
}
if let Some(w) = check_sparse_range(export, row_estimate, chunk_min, chunk_max) {
warnings.push(w);
}
if let Some(w) = check_dense_surrogate_cost(export) {
warnings.push(w);
}
if let Some(w) = check_parallel_memory_risk(export, row_estimate) {
warnings.push(w);
}
warnings
}
pub(crate) fn compute_verdict(
row_estimate: Option<i64>,
uses_index: bool,
has_cursor: bool,
) -> HealthVerdict {
let rows = row_estimate.unwrap_or(0);
match (uses_index, has_cursor, rows) {
(true, true, r) if r <= 10_000_000 => HealthVerdict::Efficient,
(true, true, _) => HealthVerdict::Acceptable,
(true, false, _) => HealthVerdict::Acceptable,
(false, _, r) if r <= 1_000_000 => HealthVerdict::Degraded,
(false, true, r) if r <= 50_000_000 => HealthVerdict::Degraded,
(false, _, _) => HealthVerdict::Unsafe,
}
}
pub(crate) fn build_suggestion(
verdict: &HealthVerdict,
row_estimate: Option<i64>,
uses_index: bool,
export: &ExportConfig,
) -> Option<String> {
let rows = row_estimate.unwrap_or(0);
match verdict {
HealthVerdict::Efficient => None,
HealthVerdict::Acceptable => {
if rows > 10_000_000 {
let mut msg = format!("Large dataset (~{}M rows).", rows / 1_000_000);
match export.mode {
ExportMode::Full => {
msg.push_str(" Switch to incremental mode with an indexed cursor column to avoid re-reading unchanged rows.");
}
ExportMode::Chunked if export.parallel <= 1 => {
msg.push_str(" Add parallel > 1 to speed up chunked extraction.");
}
_ => {
msg.push_str(" Use 'safe' tuning profile to limit database impact.");
}
}
Some(msg)
} else {
None
}
}
HealthVerdict::Degraded => {
let mut parts = Vec::new();
if !uses_index {
parts.push("No index detected -- full table scan.".to_string());
}
match export.mode {
ExportMode::Full if export.cursor_column.is_none() => {
parts.push(
"Add an indexed cursor column and switch to incremental mode.".to_string(),
);
}
ExportMode::Chunked if !uses_index => {
let col = export.chunk_column.as_deref().unwrap_or("chunk_column");
parts.push(format!(
"Create an index on '{}' to speed up range scans.",
col
));
}
ExportMode::TimeWindow if !uses_index => {
let col = export.time_column.as_deref().unwrap_or("time_column");
parts.push(format!(
"Create an index on '{}' for efficient time-window filtering.",
col
));
}
_ => {
if export.cursor_column.is_none() && !matches!(export.mode, ExportMode::Chunked)
{
parts.push(
"Consider adding a cursor column for incremental mode.".to_string(),
);
}
}
}
parts.push("Use 'safe' tuning profile to limit database impact.".to_string());
Some(parts.join(" "))
}
HealthVerdict::Unsafe => {
let mut parts = vec![format!(
"~{}M row scan without index support.",
rows / 1_000_000
)];
match export.mode {
ExportMode::Full => {
parts.push("Add an indexed cursor column and use incremental mode to avoid full re-reads.".to_string());
}
ExportMode::Chunked => {
let col = export.chunk_column.as_deref().unwrap_or("chunk_column");
parts.push(format!(
"Create an index on '{}'. Consider reducing chunk_size or adding parallel workers.",
col
));
}
ExportMode::TimeWindow => {
let col = export.time_column.as_deref().unwrap_or("time_column");
parts.push(format!(
"Create an index on '{}'. Reduce days_window if possible.",
col
));
}
ExportMode::Incremental => {
let col = export.cursor_column.as_deref().unwrap_or("cursor_column");
parts.push(format!("Create an index on '{}'.", col));
}
}
parts.push("Use 'safe' tuning profile. Extract during off-peak hours.".to_string());
Some(parts.join(" "))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ExportConfig;
fn cfg(extra_yaml: &str) -> ExportConfig {
let yaml = format!(
"name: test\nformat: parquet\ndestination:\n type: local\n path: /tmp\n{extra_yaml}"
);
serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
}
#[test]
fn derive_strategy_full_scan() {
assert_eq!(derive_strategy(&cfg("mode: full\n")), "full-scan");
}
#[test]
fn derive_strategy_full_parallel() {
let e = cfg("mode: full\nparallel: 4\n");
assert_eq!(derive_strategy(&e), "full-parallel(4)");
}
#[test]
fn derive_strategy_incremental() {
let e = cfg("mode: incremental\ncursor_column: updated_at\n");
assert_eq!(derive_strategy(&e), "incremental(updated_at)");
}
#[test]
fn derive_strategy_chunked_by_size() {
let e = cfg("mode: chunked\nchunk_column: id\nchunk_size: 50000\n");
assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
}
#[test]
fn derive_strategy_date_chunked() {
let e = cfg("mode: chunked\nchunk_column: created_at\nchunk_by_days: 7\n");
assert_eq!(derive_strategy(&e), "date-chunked(created_at, 7d)");
}
#[test]
fn recommend_profile_indexed_small_is_fast() {
let e = cfg("");
assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
}
#[test]
fn recommend_profile_indexed_medium_is_balanced() {
let e = cfg("");
assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
}
#[test]
fn recommend_profile_indexed_large_is_safe() {
let e = cfg("");
assert_eq!(recommend_profile(Some(15_000_000), true, &e), "safe");
}
#[test]
fn recommend_profile_no_index_small_is_balanced() {
let e = cfg(""); assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
}
#[test]
fn recommend_profile_no_index_large_is_safe() {
let e = cfg("");
assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
}
#[test]
fn chunk_sparsity_dense_range_not_sparse() {
let info = chunk_sparsity_from_counts(10_000, 0, 10_000, 100);
assert!(!info.is_sparse);
assert!(info.density > 0.9);
}
#[test]
fn chunk_sparsity_very_sparse_range_is_sparse() {
let info = chunk_sparsity_from_counts(100, 0, 1_000_000, 50_000);
assert!(
info.is_sparse,
"expected sparse: density={:.6}",
info.density
);
assert!(info.density < 0.1);
assert!(info.logical_windows > 10);
}
#[test]
fn chunk_sparsity_zero_rows_never_sparse() {
let info = chunk_sparsity_from_counts(0, 0, 1_000_000, 100);
assert!(!info.is_sparse);
}
#[test]
fn check_sparse_range_non_chunked_mode_returns_none() {
let e = cfg("mode: full\n");
assert!(check_sparse_range(&e, Some(100), Some("1"), Some("1000000")).is_none());
}
#[test]
fn check_sparse_range_chunk_dense_returns_none() {
let e = cfg("mode: chunked\nchunk_column: id\nchunk_dense: true\n");
assert!(check_sparse_range(&e, Some(100), Some("1"), Some("1000000")).is_none());
}
#[test]
fn check_sparse_range_chunk_by_days_returns_none() {
let e = cfg("mode: chunked\nchunk_column: created_at\nchunk_by_days: 7\n");
assert!(check_sparse_range(&e, Some(100), Some("1"), Some("1000000")).is_none());
}
#[test]
fn check_sparse_range_sparse_returns_warning() {
let e = cfg("mode: chunked\nchunk_column: id\nchunk_size: 100000\n");
let w = check_sparse_range(&e, Some(100), Some("1"), Some("5000000"));
assert!(w.is_some(), "expected sparse warning");
let msg = w.unwrap();
assert!(msg.contains("Sparse"), "got: {msg}");
}
#[test]
fn check_sparse_range_dense_range_returns_none() {
let e = cfg("mode: chunked\nchunk_column: id\nchunk_size: 100\n");
assert!(check_sparse_range(&e, Some(10_000), Some("0"), Some("10000")).is_none());
}
#[test]
fn check_dense_surrogate_row_number_in_query_returns_warning() {
let e = cfg(
"mode: chunked\nchunk_column: id\nquery: \"SELECT ROW_NUMBER() OVER (ORDER BY id) FROM t\"\n",
);
assert!(check_dense_surrogate_cost(&e).is_some());
}
#[test]
fn check_dense_surrogate_chunk_dense_flag_returns_warning() {
let e = cfg("mode: chunked\nchunk_column: id\nchunk_dense: true\n");
assert!(check_dense_surrogate_cost(&e).is_some());
}
#[test]
fn check_dense_surrogate_normal_chunked_returns_none() {
let e = cfg("mode: chunked\nchunk_column: id\n");
assert!(check_dense_surrogate_cost(&e).is_none());
}
#[test]
fn check_connection_limit_parallel_one_returns_none() {
assert!(check_connection_limit(1, Some(100)).is_none());
}
#[test]
fn check_connection_limit_unknown_max_conn_returns_warning() {
let w = check_connection_limit(4, None).unwrap();
assert!(
w.contains("max_connections") || w.contains("limit check"),
"got: {w}"
);
}
#[test]
fn check_connection_limit_exceeds_max_conn_warns() {
let w = check_connection_limit(10, Some(8)).unwrap();
assert!(w.contains("10") && w.contains("8"), "got: {w}");
}
#[test]
fn check_connection_limit_within_limit_returns_none() {
assert!(check_connection_limit(4, Some(100)).is_none());
}
#[test]
fn check_parallel_memory_risk_parallel_one_returns_none() {
let e = cfg("parallel: 1\n");
assert!(check_parallel_memory_risk(&e, Some(10_000_000)).is_none());
}
#[test]
fn check_parallel_memory_risk_small_rows_returns_none() {
let e = cfg("parallel: 4\n");
assert!(check_parallel_memory_risk(&e, Some(100_000)).is_none());
}
#[test]
fn check_parallel_memory_risk_large_rows_returns_warning() {
let e = cfg("parallel: 4\n");
let w = check_parallel_memory_risk(&e, Some(6_000_000)).unwrap();
assert!(
w.contains("Parallel=4") || w.contains("arallel"),
"got: {w}"
);
}
#[test]
fn compute_verdict_indexed_cursor_small_is_efficient() {
assert!(matches!(
compute_verdict(Some(1_000_000), true, true),
HealthVerdict::Efficient
));
}
#[test]
fn compute_verdict_indexed_no_cursor_small_is_acceptable() {
assert!(matches!(
compute_verdict(Some(1_000_000), true, false),
HealthVerdict::Acceptable
));
}
#[test]
fn compute_verdict_no_index_small_is_degraded() {
assert!(matches!(
compute_verdict(Some(500_000), false, true),
HealthVerdict::Degraded
));
}
#[test]
fn compute_verdict_no_index_very_large_is_unsafe() {
assert!(matches!(
compute_verdict(Some(5_000_000), false, false),
HealthVerdict::Unsafe
));
}
}