floe_core/checks/
mismatch.rs1use std::collections::HashMap;
2
3use polars::prelude::{DataFrame, DataType, Series};
4
5use crate::{config, report, ConfigError, FloeResult};
6
7const MAX_MISMATCH_COLUMNS: usize = 50;
8
9#[derive(Debug, Clone)]
10pub struct MismatchOutcome {
11 pub report: report::FileMismatch,
12 pub rejected: bool,
13 pub aborted: bool,
14 pub warnings: u64,
15 pub errors: u64,
16}
17
18pub fn apply_schema_mismatch(
19 entity: &config::EntityConfig,
20 declared_columns: &[config::ColumnConfig],
21 raw_df: &mut DataFrame,
22 typed_df: &mut DataFrame,
23) -> FloeResult<MismatchOutcome> {
24 let declared_names = declared_columns
25 .iter()
26 .map(|column| column.name.clone())
27 .collect::<Vec<_>>();
28 let input_names = raw_df
29 .get_column_names()
30 .iter()
31 .map(|name| name.to_string())
32 .collect::<Vec<_>>();
33
34 let declared_set = declared_names
35 .iter()
36 .cloned()
37 .collect::<std::collections::HashSet<_>>();
38 let input_set = input_names
39 .iter()
40 .cloned()
41 .collect::<std::collections::HashSet<_>>();
42
43 let mut missing = declared_names
44 .iter()
45 .filter(|name| !input_set.contains(*name))
46 .cloned()
47 .collect::<Vec<_>>();
48 let mut extra = input_names
49 .iter()
50 .filter(|name| !declared_set.contains(*name))
51 .cloned()
52 .collect::<Vec<_>>();
53 missing.sort();
54 extra.sort();
55
56 let mismatch_config = entity.schema.mismatch.as_ref();
57 let missing_policy = mismatch_config
58 .and_then(|mismatch| mismatch.missing_columns.as_deref())
59 .unwrap_or("fill_nulls");
60 let extra_policy = mismatch_config
61 .and_then(|mismatch| mismatch.extra_columns.as_deref())
62 .unwrap_or("ignore");
63
64 let mut effective_missing = missing_policy;
65 let mut effective_extra = extra_policy;
66 let mut warning = None;
67 let rejection_requested = (effective_missing == "reject_file" && !missing.is_empty())
68 || (effective_extra == "reject_file" && !extra.is_empty());
69 if rejection_requested && entity.policy.severity == "warn" {
70 warning = Some(format!(
71 "entity.name={} schema mismatch requested reject_file but policy.severity=warn; continuing",
72 entity.name
73 ));
74 effective_missing = "fill_nulls";
75 effective_extra = "ignore";
76 eprintln!(
77 "warn: {}",
78 warning.as_deref().unwrap_or("schema mismatch override")
79 );
80 }
81
82 let mut rejected = false;
83 let mut aborted = false;
84 let mut action = report::MismatchAction::None;
85 if (effective_missing == "reject_file" && !missing.is_empty())
86 || (effective_extra == "reject_file" && !extra.is_empty())
87 {
88 if entity.policy.severity == "abort" {
89 aborted = true;
90 action = report::MismatchAction::Aborted;
91 } else if entity.policy.severity == "reject" {
92 rejected = true;
93 action = report::MismatchAction::RejectedFile;
94 }
95 }
96
97 let mut errors = 0;
98 if rejected || aborted {
99 errors = 1;
100 } else {
101 let mut filled = false;
102 let mut ignored = false;
103 if effective_missing == "fill_nulls" && !missing.is_empty() {
104 add_missing_columns(raw_df, typed_df, declared_columns, &missing)?;
105 filled = true;
106 }
107 if effective_extra == "ignore" && !extra.is_empty() {
108 drop_extra_columns(raw_df, &extra)?;
109 drop_extra_columns(typed_df, &extra)?;
110 ignored = true;
111 }
112 if filled {
113 action = report::MismatchAction::FilledNulls;
114 } else if ignored {
115 action = report::MismatchAction::IgnoredExtras;
116 }
117 }
118
119 let warnings = if warning.is_some() { 1 } else { 0 };
120 let error = if rejected || aborted {
121 Some(report::MismatchIssue {
122 rule: "schema_mismatch".to_string(),
123 message: format!(
124 "entity.name={} schema mismatch: missing={} extra={}",
125 entity.name,
126 missing.len(),
127 extra.len()
128 ),
129 })
130 } else {
131 None
132 };
133
134 let mismatch_report = report::FileMismatch {
135 declared_columns_count: declared_names.len() as u64,
136 input_columns_count: input_names.len() as u64,
137 missing_columns: missing.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
138 extra_columns: extra.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
139 mismatch_action: action,
140 error,
141 warning,
142 };
143
144 Ok(MismatchOutcome {
145 report: mismatch_report,
146 rejected,
147 aborted,
148 warnings,
149 errors,
150 })
151}
152
153fn add_missing_columns(
154 raw_df: &mut DataFrame,
155 typed_df: &mut DataFrame,
156 declared_columns: &[config::ColumnConfig],
157 missing: &[String],
158) -> FloeResult<()> {
159 let mut types = HashMap::new();
160 for column in declared_columns {
161 types.insert(
162 column.name.as_str(),
163 config::parse_data_type(&column.column_type)?,
164 );
165 }
166
167 let height = raw_df.height();
168 for name in missing {
169 let raw_series = Series::full_null(name.as_str().into(), height, &DataType::String);
170 raw_df.with_column(raw_series).map_err(|err| {
171 Box::new(ConfigError(format!(
172 "failed to add missing column {}: {err}",
173 name
174 )))
175 })?;
176
177 let dtype = types
178 .get(name.as_str())
179 .cloned()
180 .unwrap_or(DataType::String);
181 let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
182 typed_df.with_column(typed_series).map_err(|err| {
183 Box::new(ConfigError(format!(
184 "failed to add missing column {}: {err}",
185 name
186 )))
187 })?;
188 }
189 Ok(())
190}
191
192fn drop_extra_columns(df: &mut DataFrame, extra: &[String]) -> FloeResult<()> {
193 for name in extra {
194 df.drop_in_place(name).map_err(|err| {
195 Box::new(ConfigError(format!(
196 "failed to drop extra column {}: {err}",
197 name
198 )))
199 })?;
200 }
201 Ok(())
202}