floe-core 0.3.1

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::collections::HashMap;

use polars::prelude::{DataFrame, DataType, Series};

use crate::errors::RunError;
use crate::{config, report, ConfigError, FloeResult};

const MAX_MISMATCH_COLUMNS: usize = 50;

#[derive(Debug, Clone)]
pub struct MismatchOutcome {
    pub report: report::FileMismatch,
    pub rejected: bool,
    pub aborted: bool,
    pub warnings: u64,
    pub errors: u64,
    pub missing: Vec<String>,
    pub extra: Vec<String>,
    pub fill_missing: bool,
    pub ignore_extra: bool,
}

pub fn top_level_declared_columns(
    columns: &[config::ColumnConfig],
    normalize_strategy: Option<&str>,
) -> FloeResult<Vec<config::ColumnConfig>> {
    let mut resolved = Vec::new();
    let mut seen = std::collections::HashSet::new();
    for column in columns {
        let source = column.source_or_name();
        if source.contains('.')
            || source.contains('[')
            || source.contains('/')
            || source.contains('@')
        {
            continue;
        }
        let normalized = if let Some(strategy) = normalize_strategy {
            crate::checks::normalize::normalize_name(source, strategy)
        } else {
            source.to_string()
        };
        if !seen.insert(normalized.clone()) {
            return Err(Box::new(ConfigError(format!(
                "duplicate top-level column selector: {}",
                normalized
            ))));
        }
        resolved.push(config::ColumnConfig {
            name: normalized,
            source: None,
            column_type: column.column_type.clone(),
            nullable: column.nullable,
            unique: column.unique,
            width: column.width,
            trim: column.trim,
        });
    }
    Ok(resolved)
}

pub fn resolve_mismatch_columns(
    entity: &config::EntityConfig,
    normalized_columns: &[config::ColumnConfig],
    normalize_strategy: Option<&str>,
) -> FloeResult<Vec<config::ColumnConfig>> {
    if entity.source.format == "json" || entity.source.format == "xml" {
        top_level_declared_columns(&entity.schema.columns, normalize_strategy)
    } else {
        Ok(normalized_columns.to_vec())
    }
}

pub fn plan_schema_mismatch(
    entity: &config::EntityConfig,
    declared_columns: &[config::ColumnConfig],
    input_names: &[String],
) -> FloeResult<MismatchOutcome> {
    let normalize_strategy = crate::checks::normalize::resolve_normalize_strategy(entity)?;
    let declared_names = declared_columns
        .iter()
        .map(|column| column.name.clone())
        .collect::<Vec<_>>();
    let input_names = match normalize_strategy.as_deref() {
        Some(strategy) => input_names
            .iter()
            .map(|name| crate::checks::normalize::normalize_name(name, strategy))
            .collect::<Vec<_>>(),
        None => input_names.to_vec(),
    };

    let declared_set = declared_names
        .iter()
        .cloned()
        .collect::<std::collections::HashSet<_>>();
    let input_set = input_names
        .iter()
        .cloned()
        .collect::<std::collections::HashSet<_>>();

    let mut missing = declared_names
        .iter()
        .filter(|name| !input_set.contains(*name))
        .cloned()
        .collect::<Vec<_>>();
    let mut extra = input_names
        .iter()
        .filter(|name| !declared_set.contains(*name))
        .cloned()
        .collect::<Vec<_>>();
    missing.sort();
    extra.sort();

    let mismatch_config = entity.schema.mismatch.as_ref();
    let missing_policy = mismatch_config
        .and_then(|mismatch| mismatch.missing_columns.as_deref())
        .unwrap_or("fill_nulls");
    let extra_policy = mismatch_config
        .and_then(|mismatch| mismatch.extra_columns.as_deref())
        .unwrap_or("ignore");

    let mut effective_missing = missing_policy;
    let mut effective_extra = extra_policy;
    let mut warning = None;
    let rejection_requested = (effective_missing == "reject_file" && !missing.is_empty())
        || (effective_extra == "reject_file" && !extra.is_empty());
    if rejection_requested && entity.policy.severity == "warn" {
        warning = Some(format!(
            "entity.name={} schema mismatch requested reject_file but policy.severity=warn; continuing",
            entity.name
        ));
        effective_missing = "fill_nulls";
        effective_extra = "ignore";
    }

    let mut rejected = false;
    let mut aborted = false;
    let mut action = report::MismatchAction::None;
    if (effective_missing == "reject_file" && !missing.is_empty())
        || (effective_extra == "reject_file" && !extra.is_empty())
    {
        if entity.policy.severity == "abort" {
            aborted = true;
            action = report::MismatchAction::Aborted;
        } else if entity.policy.severity == "reject" {
            rejected = true;
            action = report::MismatchAction::RejectedFile;
        }
    }

    let mut errors = 0;
    let mut fill_missing = false;
    let mut ignore_extra = false;
    if rejected || aborted {
        errors = 1;
    } else {
        if effective_missing == "fill_nulls" && !missing.is_empty() {
            fill_missing = true;
            action = report::MismatchAction::FilledNulls;
        }
        if effective_extra == "ignore" && !extra.is_empty() {
            ignore_extra = true;
            if !fill_missing {
                action = report::MismatchAction::IgnoredExtras;
            }
        }
    }

    let warnings = if warning.is_some() { 1 } else { 0 };
    let error = if rejected || aborted {
        Some(report::MismatchIssue {
            rule: "schema_mismatch".to_string(),
            message: format!(
                "entity.name={} schema mismatch: missing={} extra={}",
                entity.name,
                missing.len(),
                extra.len()
            ),
        })
    } else {
        None
    };

    let mismatch_report = report::FileMismatch {
        declared_columns_count: declared_names.len() as u64,
        input_columns_count: input_names.len() as u64,
        missing_columns: missing.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
        extra_columns: extra.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
        mismatch_action: action,
        error,
        warning,
    };

    Ok(MismatchOutcome {
        report: mismatch_report,
        rejected,
        aborted,
        warnings,
        errors,
        missing,
        extra,
        fill_missing,
        ignore_extra,
    })
}

pub fn apply_schema_mismatch(
    entity: &config::EntityConfig,
    declared_columns: &[config::ColumnConfig],
    input_names: &[String],
    raw_df: Option<&mut DataFrame>,
    typed_df: &mut DataFrame,
) -> FloeResult<MismatchOutcome> {
    let plan = plan_schema_mismatch(entity, declared_columns, input_names)?;
    if !plan.rejected && !plan.aborted {
        apply_mismatch_plan(&plan, declared_columns, raw_df, typed_df)?;
    }
    Ok(plan)
}

pub fn apply_mismatch_plan(
    plan: &MismatchOutcome,
    declared_columns: &[config::ColumnConfig],
    mut raw_df: Option<&mut DataFrame>,
    typed_df: &mut DataFrame,
) -> FloeResult<()> {
    if plan.fill_missing {
        if let Some(raw_df) = raw_df.as_mut() {
            add_missing_columns(raw_df, typed_df, declared_columns, &plan.missing)?;
        } else {
            add_missing_columns_typed(typed_df, declared_columns, &plan.missing)?;
        }
    }
    if plan.ignore_extra {
        if let Some(raw_df) = raw_df.as_mut() {
            drop_extra_columns(raw_df, typed_df, &plan.extra)?;
        } else {
            drop_extra_columns_typed(typed_df, &plan.extra)?;
        }
    }
    Ok(())
}

fn add_missing_columns(
    raw_df: &mut DataFrame,
    typed_df: &mut DataFrame,
    declared_columns: &[config::ColumnConfig],
    missing: &[String],
) -> FloeResult<()> {
    let mut types = HashMap::new();
    for column in declared_columns {
        types.insert(
            column.name.as_str(),
            config::parse_data_type(&column.column_type)?,
        );
    }

    let height = raw_df.height();
    for name in missing {
        let raw_series = Series::full_null(name.as_str().into(), height, &DataType::String);
        raw_df.with_column(raw_series).map_err(|err| {
            Box::new(RunError(format!(
                "failed to add missing column {}: {err}",
                name
            )))
        })?;

        let dtype = types
            .get(name.as_str())
            .cloned()
            .unwrap_or(DataType::String);
        let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
        typed_df.with_column(typed_series).map_err(|err| {
            Box::new(RunError(format!(
                "failed to add missing column {}: {err}",
                name
            )))
        })?;
    }
    Ok(())
}

fn add_missing_columns_typed(
    typed_df: &mut DataFrame,
    declared_columns: &[config::ColumnConfig],
    missing: &[String],
) -> FloeResult<()> {
    let mut types = HashMap::new();
    for column in declared_columns {
        types.insert(
            column.name.as_str(),
            config::parse_data_type(&column.column_type)?,
        );
    }

    let height = typed_df.height();
    for name in missing {
        let dtype = types
            .get(name.as_str())
            .cloned()
            .unwrap_or(DataType::String);
        let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
        typed_df.with_column(typed_series).map_err(|err| {
            Box::new(RunError(format!(
                "failed to add missing column {}: {err}",
                name
            )))
        })?;
    }
    Ok(())
}

fn drop_extra_columns(
    raw_df: &mut DataFrame,
    typed_df: &mut DataFrame,
    extra: &[String],
) -> FloeResult<()> {
    for name in extra {
        if raw_df.get_column_index(name).is_some() {
            raw_df.drop_in_place(name).map_err(|err| {
                Box::new(RunError(format!(
                    "failed to drop extra column {}: {err}",
                    name
                )))
            })?;
        }
    }

    for name in extra {
        if typed_df.get_column_index(name).is_some() {
            typed_df.drop_in_place(name).map_err(|err| {
                Box::new(RunError(format!(
                    "failed to drop extra column {}: {err}",
                    name
                )))
            })?;
        }
    }
    Ok(())
}

fn drop_extra_columns_typed(typed_df: &mut DataFrame, extra: &[String]) -> FloeResult<()> {
    for name in extra {
        if typed_df.get_column_index(name).is_some() {
            typed_df.drop_in_place(name).map_err(|err| {
                Box::new(RunError(format!(
                    "failed to drop extra column {}: {err}",
                    name
                )))
            })?;
        }
    }
    Ok(())
}