Skip to main content

floe_core/checks/
mismatch.rs

1use std::collections::HashMap;
2
3use polars::prelude::{DataFrame, DataType, Series};
4
5use crate::{config, report, ConfigError, FloeResult};
6
7const MAX_MISMATCH_COLUMNS: usize = 50;
8
9#[derive(Debug, Clone)]
10pub struct MismatchOutcome {
11    pub report: report::FileMismatch,
12    pub rejected: bool,
13    pub aborted: bool,
14    pub warnings: u64,
15    pub errors: u64,
16}
17
18pub fn apply_schema_mismatch(
19    entity: &config::EntityConfig,
20    declared_columns: &[config::ColumnConfig],
21    raw_df: &mut DataFrame,
22    typed_df: &mut DataFrame,
23) -> FloeResult<MismatchOutcome> {
24    let declared_names = declared_columns
25        .iter()
26        .map(|column| column.name.clone())
27        .collect::<Vec<_>>();
28    let input_names = raw_df
29        .get_column_names()
30        .iter()
31        .map(|name| name.to_string())
32        .collect::<Vec<_>>();
33
34    let declared_set = declared_names
35        .iter()
36        .cloned()
37        .collect::<std::collections::HashSet<_>>();
38    let input_set = input_names
39        .iter()
40        .cloned()
41        .collect::<std::collections::HashSet<_>>();
42
43    let mut missing = declared_names
44        .iter()
45        .filter(|name| !input_set.contains(*name))
46        .cloned()
47        .collect::<Vec<_>>();
48    let mut extra = input_names
49        .iter()
50        .filter(|name| !declared_set.contains(*name))
51        .cloned()
52        .collect::<Vec<_>>();
53    missing.sort();
54    extra.sort();
55
56    let mismatch_config = entity.schema.mismatch.as_ref();
57    let missing_policy = mismatch_config
58        .and_then(|mismatch| mismatch.missing_columns.as_deref())
59        .unwrap_or("fill_nulls");
60    let extra_policy = mismatch_config
61        .and_then(|mismatch| mismatch.extra_columns.as_deref())
62        .unwrap_or("ignore");
63
64    let mut effective_missing = missing_policy;
65    let mut effective_extra = extra_policy;
66    let mut warning = None;
67    let rejection_requested = (effective_missing == "reject_file" && !missing.is_empty())
68        || (effective_extra == "reject_file" && !extra.is_empty());
69    if rejection_requested && entity.policy.severity == "warn" {
70        warning = Some(format!(
71            "entity.name={} schema mismatch requested reject_file but policy.severity=warn; continuing",
72            entity.name
73        ));
74        effective_missing = "fill_nulls";
75        effective_extra = "ignore";
76        eprintln!(
77            "warn: {}",
78            warning.as_deref().unwrap_or("schema mismatch override")
79        );
80    }
81
82    let mut rejected = false;
83    let mut aborted = false;
84    let mut action = report::MismatchAction::None;
85    if (effective_missing == "reject_file" && !missing.is_empty())
86        || (effective_extra == "reject_file" && !extra.is_empty())
87    {
88        if entity.policy.severity == "abort" {
89            aborted = true;
90            action = report::MismatchAction::Aborted;
91        } else if entity.policy.severity == "reject" {
92            rejected = true;
93            action = report::MismatchAction::RejectedFile;
94        }
95    }
96
97    let mut errors = 0;
98    if rejected || aborted {
99        errors = 1;
100    } else {
101        let mut filled = false;
102        let mut ignored = false;
103        if effective_missing == "fill_nulls" && !missing.is_empty() {
104            add_missing_columns(raw_df, typed_df, declared_columns, &missing)?;
105            filled = true;
106        }
107        if effective_extra == "ignore" && !extra.is_empty() {
108            drop_extra_columns(raw_df, &extra)?;
109            drop_extra_columns(typed_df, &extra)?;
110            ignored = true;
111        }
112        if filled {
113            action = report::MismatchAction::FilledNulls;
114        } else if ignored {
115            action = report::MismatchAction::IgnoredExtras;
116        }
117    }
118
119    let warnings = if warning.is_some() { 1 } else { 0 };
120    let error = if rejected || aborted {
121        Some(report::MismatchIssue {
122            rule: "schema_mismatch".to_string(),
123            message: format!(
124                "entity.name={} schema mismatch: missing={} extra={}",
125                entity.name,
126                missing.len(),
127                extra.len()
128            ),
129        })
130    } else {
131        None
132    };
133
134    let mismatch_report = report::FileMismatch {
135        declared_columns_count: declared_names.len() as u64,
136        input_columns_count: input_names.len() as u64,
137        missing_columns: missing.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
138        extra_columns: extra.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
139        mismatch_action: action,
140        error,
141        warning,
142    };
143
144    Ok(MismatchOutcome {
145        report: mismatch_report,
146        rejected,
147        aborted,
148        warnings,
149        errors,
150    })
151}
152
153fn add_missing_columns(
154    raw_df: &mut DataFrame,
155    typed_df: &mut DataFrame,
156    declared_columns: &[config::ColumnConfig],
157    missing: &[String],
158) -> FloeResult<()> {
159    let mut types = HashMap::new();
160    for column in declared_columns {
161        types.insert(
162            column.name.as_str(),
163            config::parse_data_type(&column.column_type)?,
164        );
165    }
166
167    let height = raw_df.height();
168    for name in missing {
169        let raw_series = Series::full_null(name.as_str().into(), height, &DataType::String);
170        raw_df.with_column(raw_series).map_err(|err| {
171            Box::new(ConfigError(format!(
172                "failed to add missing column {}: {err}",
173                name
174            )))
175        })?;
176
177        let dtype = types
178            .get(name.as_str())
179            .cloned()
180            .unwrap_or(DataType::String);
181        let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
182        typed_df.with_column(typed_series).map_err(|err| {
183            Box::new(ConfigError(format!(
184                "failed to add missing column {}: {err}",
185                name
186            )))
187        })?;
188    }
189    Ok(())
190}
191
192fn drop_extra_columns(df: &mut DataFrame, extra: &[String]) -> FloeResult<()> {
193    for name in extra {
194        df.drop_in_place(name).map_err(|err| {
195            Box::new(ConfigError(format!(
196                "failed to drop extra column {}: {err}",
197                name
198            )))
199        })?;
200    }
201    Ok(())
202}