Skip to main content

floe_core/checks/
mismatch.rs

1use std::collections::HashMap;
2
3use polars::prelude::{DataFrame, DataType, Series};
4
5use crate::errors::RunError;
6use crate::{config, report, ConfigError, FloeResult};
7
8const MAX_MISMATCH_COLUMNS: usize = 50;
9
10#[derive(Debug, Clone)]
11pub struct MismatchOutcome {
12    pub report: report::FileMismatch,
13    pub rejected: bool,
14    pub aborted: bool,
15    pub warnings: u64,
16    pub errors: u64,
17    pub missing: Vec<String>,
18    pub extra: Vec<String>,
19    pub fill_missing: bool,
20    pub ignore_extra: bool,
21}
22
23pub fn top_level_declared_columns(
24    columns: &[config::ColumnConfig],
25    normalize_strategy: Option<&str>,
26) -> FloeResult<Vec<config::ColumnConfig>> {
27    let mut resolved = Vec::new();
28    let mut seen = std::collections::HashSet::new();
29    for column in columns {
30        let source = column.source_or_name();
31        if source.contains('.')
32            || source.contains('[')
33            || source.contains('/')
34            || source.contains('@')
35        {
36            continue;
37        }
38        let normalized = if let Some(strategy) = normalize_strategy {
39            crate::checks::normalize::normalize_name(source, strategy)
40        } else {
41            source.to_string()
42        };
43        if !seen.insert(normalized.clone()) {
44            return Err(Box::new(ConfigError(format!(
45                "duplicate top-level column selector: {}",
46                normalized
47            ))));
48        }
49        resolved.push(config::ColumnConfig {
50            name: normalized,
51            source: None,
52            column_type: column.column_type.clone(),
53            nullable: column.nullable,
54            unique: column.unique,
55            width: column.width,
56            trim: column.trim,
57        });
58    }
59    Ok(resolved)
60}
61
62pub fn resolve_mismatch_columns(
63    entity: &config::EntityConfig,
64    normalized_columns: &[config::ColumnConfig],
65    normalize_strategy: Option<&str>,
66) -> FloeResult<Vec<config::ColumnConfig>> {
67    if entity.source.format == "json" || entity.source.format == "xml" {
68        top_level_declared_columns(&entity.schema.columns, normalize_strategy)
69    } else {
70        Ok(normalized_columns.to_vec())
71    }
72}
73
74pub fn plan_schema_mismatch(
75    entity: &config::EntityConfig,
76    declared_columns: &[config::ColumnConfig],
77    input_names: &[String],
78) -> FloeResult<MismatchOutcome> {
79    let normalize_strategy = crate::checks::normalize::resolve_normalize_strategy(entity)?;
80    let declared_names = declared_columns
81        .iter()
82        .map(|column| column.name.clone())
83        .collect::<Vec<_>>();
84    let input_names = match normalize_strategy.as_deref() {
85        Some(strategy) => input_names
86            .iter()
87            .map(|name| crate::checks::normalize::normalize_name(name, strategy))
88            .collect::<Vec<_>>(),
89        None => input_names.to_vec(),
90    };
91
92    let declared_set = declared_names
93        .iter()
94        .cloned()
95        .collect::<std::collections::HashSet<_>>();
96    let input_set = input_names
97        .iter()
98        .cloned()
99        .collect::<std::collections::HashSet<_>>();
100
101    let mut missing = declared_names
102        .iter()
103        .filter(|name| !input_set.contains(*name))
104        .cloned()
105        .collect::<Vec<_>>();
106    let mut extra = input_names
107        .iter()
108        .filter(|name| !declared_set.contains(*name))
109        .cloned()
110        .collect::<Vec<_>>();
111    missing.sort();
112    extra.sort();
113
114    let mismatch_config = entity.schema.mismatch.as_ref();
115    let missing_policy = mismatch_config
116        .and_then(|mismatch| mismatch.missing_columns.as_deref())
117        .unwrap_or("fill_nulls");
118    let extra_policy = mismatch_config
119        .and_then(|mismatch| mismatch.extra_columns.as_deref())
120        .unwrap_or("ignore");
121
122    let mut effective_missing = missing_policy;
123    let mut effective_extra = extra_policy;
124    let mut warning = None;
125    let rejection_requested = (effective_missing == "reject_file" && !missing.is_empty())
126        || (effective_extra == "reject_file" && !extra.is_empty());
127    if rejection_requested && entity.policy.severity == "warn" {
128        warning = Some(format!(
129            "entity.name={} schema mismatch requested reject_file but policy.severity=warn; continuing",
130            entity.name
131        ));
132        effective_missing = "fill_nulls";
133        effective_extra = "ignore";
134    }
135
136    let mut rejected = false;
137    let mut aborted = false;
138    let mut action = report::MismatchAction::None;
139    if (effective_missing == "reject_file" && !missing.is_empty())
140        || (effective_extra == "reject_file" && !extra.is_empty())
141    {
142        if entity.policy.severity == "abort" {
143            aborted = true;
144            action = report::MismatchAction::Aborted;
145        } else if entity.policy.severity == "reject" {
146            rejected = true;
147            action = report::MismatchAction::RejectedFile;
148        }
149    }
150
151    let mut errors = 0;
152    let mut fill_missing = false;
153    let mut ignore_extra = false;
154    if rejected || aborted {
155        errors = 1;
156    } else {
157        if effective_missing == "fill_nulls" && !missing.is_empty() {
158            fill_missing = true;
159            action = report::MismatchAction::FilledNulls;
160        }
161        if effective_extra == "ignore" && !extra.is_empty() {
162            ignore_extra = true;
163            if !fill_missing {
164                action = report::MismatchAction::IgnoredExtras;
165            }
166        }
167    }
168
169    let warnings = if warning.is_some() { 1 } else { 0 };
170    let error = if rejected || aborted {
171        Some(report::MismatchIssue {
172            rule: "schema_mismatch".to_string(),
173            message: format!(
174                "entity.name={} schema mismatch: missing={} extra={}",
175                entity.name,
176                missing.len(),
177                extra.len()
178            ),
179        })
180    } else {
181        None
182    };
183
184    let mismatch_report = report::FileMismatch {
185        declared_columns_count: declared_names.len() as u64,
186        input_columns_count: input_names.len() as u64,
187        missing_columns: missing.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
188        extra_columns: extra.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
189        mismatch_action: action,
190        error,
191        warning,
192    };
193
194    Ok(MismatchOutcome {
195        report: mismatch_report,
196        rejected,
197        aborted,
198        warnings,
199        errors,
200        missing,
201        extra,
202        fill_missing,
203        ignore_extra,
204    })
205}
206
207pub fn apply_schema_mismatch(
208    entity: &config::EntityConfig,
209    declared_columns: &[config::ColumnConfig],
210    input_names: &[String],
211    raw_df: Option<&mut DataFrame>,
212    typed_df: &mut DataFrame,
213) -> FloeResult<MismatchOutcome> {
214    let plan = plan_schema_mismatch(entity, declared_columns, input_names)?;
215    if !plan.rejected && !plan.aborted {
216        apply_mismatch_plan(&plan, declared_columns, raw_df, typed_df)?;
217    }
218    Ok(plan)
219}
220
221pub fn apply_mismatch_plan(
222    plan: &MismatchOutcome,
223    declared_columns: &[config::ColumnConfig],
224    mut raw_df: Option<&mut DataFrame>,
225    typed_df: &mut DataFrame,
226) -> FloeResult<()> {
227    if plan.fill_missing {
228        if let Some(raw_df) = raw_df.as_mut() {
229            add_missing_columns(raw_df, typed_df, declared_columns, &plan.missing)?;
230        } else {
231            add_missing_columns_typed(typed_df, declared_columns, &plan.missing)?;
232        }
233    }
234    if plan.ignore_extra {
235        if let Some(raw_df) = raw_df.as_mut() {
236            drop_extra_columns(raw_df, typed_df, &plan.extra)?;
237        } else {
238            drop_extra_columns_typed(typed_df, &plan.extra)?;
239        }
240    }
241    Ok(())
242}
243
244fn add_missing_columns(
245    raw_df: &mut DataFrame,
246    typed_df: &mut DataFrame,
247    declared_columns: &[config::ColumnConfig],
248    missing: &[String],
249) -> FloeResult<()> {
250    let mut types = HashMap::new();
251    for column in declared_columns {
252        types.insert(
253            column.name.as_str(),
254            config::parse_data_type(&column.column_type)?,
255        );
256    }
257
258    let height = raw_df.height();
259    for name in missing {
260        let raw_series = Series::full_null(name.as_str().into(), height, &DataType::String);
261        raw_df.with_column(raw_series).map_err(|err| {
262            Box::new(RunError(format!(
263                "failed to add missing column {}: {err}",
264                name
265            )))
266        })?;
267
268        let dtype = types
269            .get(name.as_str())
270            .cloned()
271            .unwrap_or(DataType::String);
272        let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
273        typed_df.with_column(typed_series).map_err(|err| {
274            Box::new(RunError(format!(
275                "failed to add missing column {}: {err}",
276                name
277            )))
278        })?;
279    }
280    Ok(())
281}
282
283fn add_missing_columns_typed(
284    typed_df: &mut DataFrame,
285    declared_columns: &[config::ColumnConfig],
286    missing: &[String],
287) -> FloeResult<()> {
288    let mut types = HashMap::new();
289    for column in declared_columns {
290        types.insert(
291            column.name.as_str(),
292            config::parse_data_type(&column.column_type)?,
293        );
294    }
295
296    let height = typed_df.height();
297    for name in missing {
298        let dtype = types
299            .get(name.as_str())
300            .cloned()
301            .unwrap_or(DataType::String);
302        let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
303        typed_df.with_column(typed_series).map_err(|err| {
304            Box::new(RunError(format!(
305                "failed to add missing column {}: {err}",
306                name
307            )))
308        })?;
309    }
310    Ok(())
311}
312
313fn drop_extra_columns(
314    raw_df: &mut DataFrame,
315    typed_df: &mut DataFrame,
316    extra: &[String],
317) -> FloeResult<()> {
318    for name in extra {
319        if raw_df.get_column_index(name).is_some() {
320            raw_df.drop_in_place(name).map_err(|err| {
321                Box::new(RunError(format!(
322                    "failed to drop extra column {}: {err}",
323                    name
324                )))
325            })?;
326        }
327    }
328
329    for name in extra {
330        if typed_df.get_column_index(name).is_some() {
331            typed_df.drop_in_place(name).map_err(|err| {
332                Box::new(RunError(format!(
333                    "failed to drop extra column {}: {err}",
334                    name
335                )))
336            })?;
337        }
338    }
339    Ok(())
340}
341
342fn drop_extra_columns_typed(typed_df: &mut DataFrame, extra: &[String]) -> FloeResult<()> {
343    for name in extra {
344        if typed_df.get_column_index(name).is_some() {
345            typed_df.drop_in_place(name).map_err(|err| {
346                Box::new(RunError(format!(
347                    "failed to drop extra column {}: {err}",
348                    name
349                )))
350            })?;
351        }
352    }
353    Ok(())
354}