Skip to main content

floe_core/checks/
mismatch.rs

1use std::collections::HashMap;
2
3use polars::prelude::{DataFrame, DataType, Series};
4
5use crate::config::PolicySeverity;
6use crate::errors::RunError;
7use crate::{config, report, ConfigError, FloeResult};
8
9const MAX_MISMATCH_COLUMNS: usize = 50;
10
11#[derive(Debug, Clone)]
12pub struct MismatchOutcome {
13    pub report: report::FileMismatch,
14    pub rejected: bool,
15    pub aborted: bool,
16    pub warnings: u64,
17    pub errors: u64,
18    pub missing: Vec<String>,
19    pub extra: Vec<String>,
20    pub fill_missing: bool,
21    pub ignore_extra: bool,
22}
23
24pub fn top_level_declared_columns(
25    columns: &[config::ColumnConfig],
26    normalize_strategy: Option<&str>,
27) -> FloeResult<Vec<config::ColumnConfig>> {
28    let mut resolved = Vec::new();
29    let mut seen = std::collections::HashSet::new();
30    for column in columns {
31        let source = column.source_or_name();
32        if source.contains('.')
33            || source.contains('[')
34            || source.contains('/')
35            || source.contains('@')
36        {
37            continue;
38        }
39        let normalized = if let Some(strategy) = normalize_strategy {
40            crate::checks::normalize::normalize_name(source, strategy)
41        } else {
42            source.to_string()
43        };
44        if !seen.insert(normalized.clone()) {
45            return Err(Box::new(ConfigError(format!(
46                "duplicate top-level column selector: {}",
47                normalized
48            ))));
49        }
50        resolved.push(config::ColumnConfig {
51            name: normalized,
52            source: None,
53            column_type: column.column_type.clone(),
54            nullable: column.nullable,
55            unique: column.unique,
56            width: column.width,
57            trim: column.trim,
58        });
59    }
60    Ok(resolved)
61}
62
63pub fn resolve_mismatch_columns(
64    entity: &config::EntityConfig,
65    normalized_columns: &[config::ColumnConfig],
66    normalize_strategy: Option<&str>,
67) -> FloeResult<Vec<config::ColumnConfig>> {
68    if entity.source.format == "json" || entity.source.format == "xml" {
69        top_level_declared_columns(&entity.schema.columns, normalize_strategy)
70    } else {
71        Ok(normalized_columns.to_vec())
72    }
73}
74
75pub fn plan_schema_mismatch(
76    entity: &config::EntityConfig,
77    declared_columns: &[config::ColumnConfig],
78    input_names: &[String],
79) -> FloeResult<MismatchOutcome> {
80    let normalize_strategy = crate::checks::normalize::resolve_normalize_strategy(entity)?;
81    let declared_names = declared_columns
82        .iter()
83        .map(|column| column.name.clone())
84        .collect::<Vec<_>>();
85    let input_names = match normalize_strategy.as_deref() {
86        Some(strategy) => input_names
87            .iter()
88            .map(|name| crate::checks::normalize::normalize_name(name, strategy))
89            .collect::<Vec<_>>(),
90        None => input_names.to_vec(),
91    };
92
93    let declared_set = declared_names
94        .iter()
95        .cloned()
96        .collect::<std::collections::HashSet<_>>();
97    let input_set = input_names
98        .iter()
99        .cloned()
100        .collect::<std::collections::HashSet<_>>();
101
102    let mut missing = declared_names
103        .iter()
104        .filter(|name| !input_set.contains(*name))
105        .cloned()
106        .collect::<Vec<_>>();
107    let mut extra = input_names
108        .iter()
109        .filter(|name| !declared_set.contains(*name))
110        .cloned()
111        .collect::<Vec<_>>();
112    missing.sort();
113    extra.sort();
114
115    let mismatch_config = entity.schema.mismatch.as_ref();
116    let missing_policy = mismatch_config
117        .and_then(|mismatch| mismatch.missing_columns.as_deref())
118        .unwrap_or("fill_nulls");
119    let extra_policy = mismatch_config
120        .and_then(|mismatch| mismatch.extra_columns.as_deref())
121        .unwrap_or("ignore");
122
123    let mut effective_missing = missing_policy;
124    let mut effective_extra = extra_policy;
125    let mut warning = None;
126    let rejection_requested = (effective_missing == "reject_file" && !missing.is_empty())
127        || (effective_extra == "reject_file" && !extra.is_empty());
128    if rejection_requested && entity.policy.severity == PolicySeverity::Warn {
129        warning = Some(format!(
130            "entity.name={} schema mismatch requested reject_file but policy.severity=warn; continuing",
131            entity.name
132        ));
133        effective_missing = "fill_nulls";
134        effective_extra = "ignore";
135    }
136
137    let mut rejected = false;
138    let mut aborted = false;
139    let mut action = report::MismatchAction::None;
140    if (effective_missing == "reject_file" && !missing.is_empty())
141        || (effective_extra == "reject_file" && !extra.is_empty())
142    {
143        if entity.policy.severity == PolicySeverity::Abort {
144            aborted = true;
145            action = report::MismatchAction::Aborted;
146        } else if entity.policy.severity == PolicySeverity::Reject {
147            rejected = true;
148            action = report::MismatchAction::RejectedFile;
149        }
150    }
151
152    let mut errors = 0;
153    let mut fill_missing = false;
154    let mut ignore_extra = false;
155    if rejected || aborted {
156        errors = 1;
157    } else {
158        if effective_missing == "fill_nulls" && !missing.is_empty() {
159            fill_missing = true;
160            action = report::MismatchAction::FilledNulls;
161        }
162        if effective_extra == "ignore" && !extra.is_empty() {
163            ignore_extra = true;
164            if !fill_missing {
165                action = report::MismatchAction::IgnoredExtras;
166            }
167        }
168    }
169
170    let warnings = if warning.is_some() { 1 } else { 0 };
171    let error = if rejected || aborted {
172        Some(report::MismatchIssue {
173            rule: "schema_mismatch".to_string(),
174            message: format!(
175                "entity.name={} schema mismatch: missing={} extra={}",
176                entity.name,
177                missing.len(),
178                extra.len()
179            ),
180        })
181    } else {
182        None
183    };
184
185    let mismatch_report = report::FileMismatch {
186        declared_columns_count: declared_names.len() as u64,
187        input_columns_count: input_names.len() as u64,
188        missing_columns: missing.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
189        extra_columns: extra.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
190        mismatch_action: action,
191        error,
192        warning,
193    };
194
195    Ok(MismatchOutcome {
196        report: mismatch_report,
197        rejected,
198        aborted,
199        warnings,
200        errors,
201        missing,
202        extra,
203        fill_missing,
204        ignore_extra,
205    })
206}
207
208pub fn apply_schema_mismatch(
209    entity: &config::EntityConfig,
210    declared_columns: &[config::ColumnConfig],
211    input_names: &[String],
212    raw_df: Option<&mut DataFrame>,
213    typed_df: &mut DataFrame,
214) -> FloeResult<MismatchOutcome> {
215    let plan = plan_schema_mismatch(entity, declared_columns, input_names)?;
216    if !plan.rejected && !plan.aborted {
217        apply_mismatch_plan(&plan, declared_columns, raw_df, typed_df)?;
218    }
219    Ok(plan)
220}
221
222pub fn apply_mismatch_plan(
223    plan: &MismatchOutcome,
224    declared_columns: &[config::ColumnConfig],
225    mut raw_df: Option<&mut DataFrame>,
226    typed_df: &mut DataFrame,
227) -> FloeResult<()> {
228    if plan.fill_missing {
229        if let Some(raw_df) = raw_df.as_mut() {
230            add_missing_columns(raw_df, typed_df, declared_columns, &plan.missing)?;
231        } else {
232            add_missing_columns_typed(typed_df, declared_columns, &plan.missing)?;
233        }
234    }
235    if plan.ignore_extra {
236        if let Some(raw_df) = raw_df.as_mut() {
237            drop_extra_columns(raw_df, typed_df, &plan.extra)?;
238        } else {
239            drop_extra_columns_typed(typed_df, &plan.extra)?;
240        }
241    }
242    Ok(())
243}
244
245fn add_missing_columns(
246    raw_df: &mut DataFrame,
247    typed_df: &mut DataFrame,
248    declared_columns: &[config::ColumnConfig],
249    missing: &[String],
250) -> FloeResult<()> {
251    let mut types = HashMap::new();
252    for column in declared_columns {
253        types.insert(
254            column.name.as_str(),
255            config::parse_data_type(&column.column_type)?,
256        );
257    }
258
259    let height = raw_df.height();
260    for name in missing {
261        let raw_series = Series::full_null(name.as_str().into(), height, &DataType::String);
262        raw_df.with_column(raw_series).map_err(|err| {
263            Box::new(RunError(format!(
264                "failed to add missing column {}: {err}",
265                name
266            )))
267        })?;
268
269        let dtype = types
270            .get(name.as_str())
271            .cloned()
272            .unwrap_or(DataType::String);
273        let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
274        typed_df.with_column(typed_series).map_err(|err| {
275            Box::new(RunError(format!(
276                "failed to add missing column {}: {err}",
277                name
278            )))
279        })?;
280    }
281    Ok(())
282}
283
284fn add_missing_columns_typed(
285    typed_df: &mut DataFrame,
286    declared_columns: &[config::ColumnConfig],
287    missing: &[String],
288) -> FloeResult<()> {
289    let mut types = HashMap::new();
290    for column in declared_columns {
291        types.insert(
292            column.name.as_str(),
293            config::parse_data_type(&column.column_type)?,
294        );
295    }
296
297    let height = typed_df.height();
298    for name in missing {
299        let dtype = types
300            .get(name.as_str())
301            .cloned()
302            .unwrap_or(DataType::String);
303        let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
304        typed_df.with_column(typed_series).map_err(|err| {
305            Box::new(RunError(format!(
306                "failed to add missing column {}: {err}",
307                name
308            )))
309        })?;
310    }
311    Ok(())
312}
313
314fn drop_extra_columns(
315    raw_df: &mut DataFrame,
316    typed_df: &mut DataFrame,
317    extra: &[String],
318) -> FloeResult<()> {
319    for name in extra {
320        if raw_df.get_column_index(name).is_some() {
321            raw_df.drop_in_place(name).map_err(|err| {
322                Box::new(RunError(format!(
323                    "failed to drop extra column {}: {err}",
324                    name
325                )))
326            })?;
327        }
328    }
329
330    for name in extra {
331        if typed_df.get_column_index(name).is_some() {
332            typed_df.drop_in_place(name).map_err(|err| {
333                Box::new(RunError(format!(
334                    "failed to drop extra column {}: {err}",
335                    name
336                )))
337            })?;
338        }
339    }
340    Ok(())
341}
342
343fn drop_extra_columns_typed(typed_df: &mut DataFrame, extra: &[String]) -> FloeResult<()> {
344    for name in extra {
345        if typed_df.get_column_index(name).is_some() {
346            typed_df.drop_in_place(name).map_err(|err| {
347                Box::new(RunError(format!(
348                    "failed to drop extra column {}: {err}",
349                    name
350                )))
351            })?;
352        }
353    }
354    Ok(())
355}