Skip to main content

floe_core/checks/
mismatch.rs

1use std::collections::HashMap;
2
3use polars::prelude::{DataFrame, DataType, Series};
4
5use crate::errors::RunError;
6use crate::{config, report, FloeResult};
7
8const MAX_MISMATCH_COLUMNS: usize = 50;
9
10#[derive(Debug, Clone)]
11pub struct MismatchOutcome {
12    pub report: report::FileMismatch,
13    pub rejected: bool,
14    pub aborted: bool,
15    pub warnings: u64,
16    pub errors: u64,
17    pub missing: Vec<String>,
18    pub extra: Vec<String>,
19    pub fill_missing: bool,
20    pub ignore_extra: bool,
21}
22
23pub fn plan_schema_mismatch(
24    entity: &config::EntityConfig,
25    declared_columns: &[config::ColumnConfig],
26    input_names: &[String],
27) -> FloeResult<MismatchOutcome> {
28    let normalize_strategy = crate::run::normalize::resolve_normalize_strategy(entity)?;
29    let declared_names = declared_columns
30        .iter()
31        .map(|column| column.name.clone())
32        .collect::<Vec<_>>();
33    let input_names = match normalize_strategy.as_deref() {
34        Some(strategy) => input_names
35            .iter()
36            .map(|name| crate::run::normalize::normalize_name(name, strategy))
37            .collect::<Vec<_>>(),
38        None => input_names.to_vec(),
39    };
40
41    let declared_set = declared_names
42        .iter()
43        .cloned()
44        .collect::<std::collections::HashSet<_>>();
45    let input_set = input_names
46        .iter()
47        .cloned()
48        .collect::<std::collections::HashSet<_>>();
49
50    let mut missing = declared_names
51        .iter()
52        .filter(|name| !input_set.contains(*name))
53        .cloned()
54        .collect::<Vec<_>>();
55    let mut extra = input_names
56        .iter()
57        .filter(|name| !declared_set.contains(*name))
58        .cloned()
59        .collect::<Vec<_>>();
60    missing.sort();
61    extra.sort();
62
63    let mismatch_config = entity.schema.mismatch.as_ref();
64    let missing_policy = mismatch_config
65        .and_then(|mismatch| mismatch.missing_columns.as_deref())
66        .unwrap_or("fill_nulls");
67    let extra_policy = mismatch_config
68        .and_then(|mismatch| mismatch.extra_columns.as_deref())
69        .unwrap_or("ignore");
70
71    let mut effective_missing = missing_policy;
72    let mut effective_extra = extra_policy;
73    let mut warning = None;
74    let rejection_requested = (effective_missing == "reject_file" && !missing.is_empty())
75        || (effective_extra == "reject_file" && !extra.is_empty());
76    if rejection_requested && entity.policy.severity == "warn" {
77        warning = Some(format!(
78            "entity.name={} schema mismatch requested reject_file but policy.severity=warn; continuing",
79            entity.name
80        ));
81        effective_missing = "fill_nulls";
82        effective_extra = "ignore";
83    }
84
85    let mut rejected = false;
86    let mut aborted = false;
87    let mut action = report::MismatchAction::None;
88    if (effective_missing == "reject_file" && !missing.is_empty())
89        || (effective_extra == "reject_file" && !extra.is_empty())
90    {
91        if entity.policy.severity == "abort" {
92            aborted = true;
93            action = report::MismatchAction::Aborted;
94        } else if entity.policy.severity == "reject" {
95            rejected = true;
96            action = report::MismatchAction::RejectedFile;
97        }
98    }
99
100    let mut errors = 0;
101    let mut fill_missing = false;
102    let mut ignore_extra = false;
103    if rejected || aborted {
104        errors = 1;
105    } else {
106        if effective_missing == "fill_nulls" && !missing.is_empty() {
107            fill_missing = true;
108            action = report::MismatchAction::FilledNulls;
109        }
110        if effective_extra == "ignore" && !extra.is_empty() {
111            ignore_extra = true;
112            if !fill_missing {
113                action = report::MismatchAction::IgnoredExtras;
114            }
115        }
116    }
117
118    let warnings = if warning.is_some() { 1 } else { 0 };
119    let error = if rejected || aborted {
120        Some(report::MismatchIssue {
121            rule: "schema_mismatch".to_string(),
122            message: format!(
123                "entity.name={} schema mismatch: missing={} extra={}",
124                entity.name,
125                missing.len(),
126                extra.len()
127            ),
128        })
129    } else {
130        None
131    };
132
133    let mismatch_report = report::FileMismatch {
134        declared_columns_count: declared_names.len() as u64,
135        input_columns_count: input_names.len() as u64,
136        missing_columns: missing.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
137        extra_columns: extra.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
138        mismatch_action: action,
139        error,
140        warning,
141    };
142
143    Ok(MismatchOutcome {
144        report: mismatch_report,
145        rejected,
146        aborted,
147        warnings,
148        errors,
149        missing,
150        extra,
151        fill_missing,
152        ignore_extra,
153    })
154}
155
156pub fn apply_schema_mismatch(
157    entity: &config::EntityConfig,
158    declared_columns: &[config::ColumnConfig],
159    input_names: &[String],
160    raw_df: Option<&mut DataFrame>,
161    typed_df: &mut DataFrame,
162) -> FloeResult<MismatchOutcome> {
163    let plan = plan_schema_mismatch(entity, declared_columns, input_names)?;
164    if !plan.rejected && !plan.aborted {
165        apply_mismatch_plan(&plan, declared_columns, raw_df, typed_df)?;
166    }
167    Ok(plan)
168}
169
170pub fn apply_mismatch_plan(
171    plan: &MismatchOutcome,
172    declared_columns: &[config::ColumnConfig],
173    mut raw_df: Option<&mut DataFrame>,
174    typed_df: &mut DataFrame,
175) -> FloeResult<()> {
176    if plan.fill_missing {
177        if let Some(raw_df) = raw_df.as_mut() {
178            add_missing_columns(raw_df, typed_df, declared_columns, &plan.missing)?;
179        } else {
180            add_missing_columns_typed(typed_df, declared_columns, &plan.missing)?;
181        }
182    }
183    if plan.ignore_extra {
184        if let Some(raw_df) = raw_df.as_mut() {
185            drop_extra_columns(raw_df, typed_df, &plan.extra)?;
186        } else {
187            drop_extra_columns_typed(typed_df, &plan.extra)?;
188        }
189    }
190    Ok(())
191}
192
193fn add_missing_columns(
194    raw_df: &mut DataFrame,
195    typed_df: &mut DataFrame,
196    declared_columns: &[config::ColumnConfig],
197    missing: &[String],
198) -> FloeResult<()> {
199    let mut types = HashMap::new();
200    for column in declared_columns {
201        types.insert(
202            column.name.as_str(),
203            config::parse_data_type(&column.column_type)?,
204        );
205    }
206
207    let height = raw_df.height();
208    for name in missing {
209        let raw_series = Series::full_null(name.as_str().into(), height, &DataType::String);
210        raw_df.with_column(raw_series).map_err(|err| {
211            Box::new(RunError(format!(
212                "failed to add missing column {}: {err}",
213                name
214            )))
215        })?;
216
217        let dtype = types
218            .get(name.as_str())
219            .cloned()
220            .unwrap_or(DataType::String);
221        let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
222        typed_df.with_column(typed_series).map_err(|err| {
223            Box::new(RunError(format!(
224                "failed to add missing column {}: {err}",
225                name
226            )))
227        })?;
228    }
229    Ok(())
230}
231
232fn add_missing_columns_typed(
233    typed_df: &mut DataFrame,
234    declared_columns: &[config::ColumnConfig],
235    missing: &[String],
236) -> FloeResult<()> {
237    let mut types = HashMap::new();
238    for column in declared_columns {
239        types.insert(
240            column.name.as_str(),
241            config::parse_data_type(&column.column_type)?,
242        );
243    }
244
245    let height = typed_df.height();
246    for name in missing {
247        let dtype = types
248            .get(name.as_str())
249            .cloned()
250            .unwrap_or(DataType::String);
251        let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
252        typed_df.with_column(typed_series).map_err(|err| {
253            Box::new(RunError(format!(
254                "failed to add missing column {}: {err}",
255                name
256            )))
257        })?;
258    }
259    Ok(())
260}
261
262fn drop_extra_columns(
263    raw_df: &mut DataFrame,
264    typed_df: &mut DataFrame,
265    extra: &[String],
266) -> FloeResult<()> {
267    for name in extra {
268        if raw_df.get_column_index(name).is_some() {
269            raw_df.drop_in_place(name).map_err(|err| {
270                Box::new(RunError(format!(
271                    "failed to drop extra column {}: {err}",
272                    name
273                )))
274            })?;
275        }
276    }
277
278    for name in extra {
279        if typed_df.get_column_index(name).is_some() {
280            typed_df.drop_in_place(name).map_err(|err| {
281                Box::new(RunError(format!(
282                    "failed to drop extra column {}: {err}",
283                    name
284                )))
285            })?;
286        }
287    }
288    Ok(())
289}
290
291fn drop_extra_columns_typed(typed_df: &mut DataFrame, extra: &[String]) -> FloeResult<()> {
292    for name in extra {
293        if typed_df.get_column_index(name).is_some() {
294            typed_df.drop_in_place(name).map_err(|err| {
295                Box::new(RunError(format!(
296                    "failed to drop extra column {}: {err}",
297                    name
298                )))
299            })?;
300        }
301    }
302    Ok(())
303}