1use std::collections::HashMap;
2
3use polars::prelude::{DataFrame, DataType, Series};
4
5use crate::config::PolicySeverity;
6use crate::errors::RunError;
7use crate::{config, report, ConfigError, FloeResult};
8
9const MAX_MISMATCH_COLUMNS: usize = 50;
10
11#[derive(Debug, Clone)]
12pub struct MismatchOutcome {
13 pub report: report::FileMismatch,
14 pub rejected: bool,
15 pub aborted: bool,
16 pub warnings: u64,
17 pub errors: u64,
18 pub missing: Vec<String>,
19 pub extra: Vec<String>,
20 pub fill_missing: bool,
21 pub ignore_extra: bool,
22}
23
24pub fn top_level_declared_columns(
25 columns: &[config::ColumnConfig],
26 normalize_strategy: Option<&str>,
27) -> FloeResult<Vec<config::ColumnConfig>> {
28 let mut resolved = Vec::new();
29 let mut seen = std::collections::HashSet::new();
30 for column in columns {
31 let source = column.source_or_name();
32 if source.contains('.')
33 || source.contains('[')
34 || source.contains('/')
35 || source.contains('@')
36 {
37 continue;
38 }
39 let normalized = if let Some(strategy) = normalize_strategy {
40 crate::checks::normalize::normalize_name(source, strategy)
41 } else {
42 source.to_string()
43 };
44 if !seen.insert(normalized.clone()) {
45 return Err(Box::new(ConfigError(format!(
46 "duplicate top-level column selector: {}",
47 normalized
48 ))));
49 }
50 resolved.push(config::ColumnConfig {
51 name: normalized,
52 source: None,
53 column_type: column.column_type.clone(),
54 nullable: column.nullable,
55 unique: column.unique,
56 width: column.width,
57 trim: column.trim,
58 });
59 }
60 Ok(resolved)
61}
62
63pub fn resolve_mismatch_columns(
64 entity: &config::EntityConfig,
65 normalized_columns: &[config::ColumnConfig],
66 normalize_strategy: Option<&str>,
67) -> FloeResult<Vec<config::ColumnConfig>> {
68 if entity.source.format == "json" || entity.source.format == "xml" {
69 top_level_declared_columns(&entity.schema.columns, normalize_strategy)
70 } else {
71 Ok(normalized_columns.to_vec())
72 }
73}
74
75pub fn plan_schema_mismatch(
76 entity: &config::EntityConfig,
77 declared_columns: &[config::ColumnConfig],
78 input_names: &[String],
79) -> FloeResult<MismatchOutcome> {
80 let normalize_strategy = crate::checks::normalize::resolve_normalize_strategy(entity)?;
81 let declared_names = declared_columns
82 .iter()
83 .map(|column| column.name.clone())
84 .collect::<Vec<_>>();
85 let input_names = match normalize_strategy.as_deref() {
86 Some(strategy) => input_names
87 .iter()
88 .map(|name| crate::checks::normalize::normalize_name(name, strategy))
89 .collect::<Vec<_>>(),
90 None => input_names.to_vec(),
91 };
92
93 let declared_set = declared_names
94 .iter()
95 .cloned()
96 .collect::<std::collections::HashSet<_>>();
97 let input_set = input_names
98 .iter()
99 .cloned()
100 .collect::<std::collections::HashSet<_>>();
101
102 let mut missing = declared_names
103 .iter()
104 .filter(|name| !input_set.contains(*name))
105 .cloned()
106 .collect::<Vec<_>>();
107 let mut extra = input_names
108 .iter()
109 .filter(|name| !declared_set.contains(*name))
110 .cloned()
111 .collect::<Vec<_>>();
112 missing.sort();
113 extra.sort();
114
115 let mismatch_config = entity.schema.mismatch.as_ref();
116 let missing_policy = mismatch_config
117 .and_then(|mismatch| mismatch.missing_columns.as_deref())
118 .unwrap_or("fill_nulls");
119 let extra_policy = mismatch_config
120 .and_then(|mismatch| mismatch.extra_columns.as_deref())
121 .unwrap_or("ignore");
122
123 let mut effective_missing = missing_policy;
124 let mut effective_extra = extra_policy;
125 let mut warning = None;
126 let rejection_requested = (effective_missing == "reject_file" && !missing.is_empty())
127 || (effective_extra == "reject_file" && !extra.is_empty());
128 if rejection_requested && entity.policy.severity == PolicySeverity::Warn {
129 warning = Some(format!(
130 "entity.name={} schema mismatch requested reject_file but policy.severity=warn; continuing",
131 entity.name
132 ));
133 effective_missing = "fill_nulls";
134 effective_extra = "ignore";
135 }
136
137 let mut rejected = false;
138 let mut aborted = false;
139 let mut action = report::MismatchAction::None;
140 if (effective_missing == "reject_file" && !missing.is_empty())
141 || (effective_extra == "reject_file" && !extra.is_empty())
142 {
143 if entity.policy.severity == PolicySeverity::Abort {
144 aborted = true;
145 action = report::MismatchAction::Aborted;
146 } else if entity.policy.severity == PolicySeverity::Reject {
147 rejected = true;
148 action = report::MismatchAction::RejectedFile;
149 }
150 }
151
152 let mut errors = 0;
153 let mut fill_missing = false;
154 let mut ignore_extra = false;
155 if rejected || aborted {
156 errors = 1;
157 } else {
158 if effective_missing == "fill_nulls" && !missing.is_empty() {
159 fill_missing = true;
160 action = report::MismatchAction::FilledNulls;
161 }
162 if effective_extra == "ignore" && !extra.is_empty() {
163 ignore_extra = true;
164 if !fill_missing {
165 action = report::MismatchAction::IgnoredExtras;
166 }
167 }
168 }
169
170 let warnings = if warning.is_some() { 1 } else { 0 };
171 let error = if rejected || aborted {
172 Some(report::MismatchIssue {
173 rule: "schema_mismatch".to_string(),
174 message: format!(
175 "entity.name={} schema mismatch: missing={} extra={}",
176 entity.name,
177 missing.len(),
178 extra.len()
179 ),
180 })
181 } else {
182 None
183 };
184
185 let mismatch_report = report::FileMismatch {
186 declared_columns_count: declared_names.len() as u64,
187 input_columns_count: input_names.len() as u64,
188 missing_columns: missing.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
189 extra_columns: extra.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
190 mismatch_action: action,
191 error,
192 warning,
193 };
194
195 Ok(MismatchOutcome {
196 report: mismatch_report,
197 rejected,
198 aborted,
199 warnings,
200 errors,
201 missing,
202 extra,
203 fill_missing,
204 ignore_extra,
205 })
206}
207
208pub fn apply_schema_mismatch(
209 entity: &config::EntityConfig,
210 declared_columns: &[config::ColumnConfig],
211 input_names: &[String],
212 raw_df: Option<&mut DataFrame>,
213 typed_df: &mut DataFrame,
214) -> FloeResult<MismatchOutcome> {
215 let plan = plan_schema_mismatch(entity, declared_columns, input_names)?;
216 if !plan.rejected && !plan.aborted {
217 apply_mismatch_plan(&plan, declared_columns, raw_df, typed_df)?;
218 }
219 Ok(plan)
220}
221
222pub fn apply_mismatch_plan(
223 plan: &MismatchOutcome,
224 declared_columns: &[config::ColumnConfig],
225 mut raw_df: Option<&mut DataFrame>,
226 typed_df: &mut DataFrame,
227) -> FloeResult<()> {
228 if plan.fill_missing {
229 if let Some(raw_df) = raw_df.as_mut() {
230 add_missing_columns(raw_df, typed_df, declared_columns, &plan.missing)?;
231 } else {
232 add_missing_columns_typed(typed_df, declared_columns, &plan.missing)?;
233 }
234 }
235 if plan.ignore_extra {
236 if let Some(raw_df) = raw_df.as_mut() {
237 drop_extra_columns(raw_df, typed_df, &plan.extra)?;
238 } else {
239 drop_extra_columns_typed(typed_df, &plan.extra)?;
240 }
241 }
242 Ok(())
243}
244
245fn add_missing_columns(
246 raw_df: &mut DataFrame,
247 typed_df: &mut DataFrame,
248 declared_columns: &[config::ColumnConfig],
249 missing: &[String],
250) -> FloeResult<()> {
251 let mut types = HashMap::new();
252 for column in declared_columns {
253 types.insert(
254 column.name.as_str(),
255 config::parse_data_type(&column.column_type)?,
256 );
257 }
258
259 let height = raw_df.height();
260 for name in missing {
261 let raw_series = Series::full_null(name.as_str().into(), height, &DataType::String);
262 raw_df.with_column(raw_series).map_err(|err| {
263 Box::new(RunError(format!(
264 "failed to add missing column {}: {err}",
265 name
266 )))
267 })?;
268
269 let dtype = types
270 .get(name.as_str())
271 .cloned()
272 .unwrap_or(DataType::String);
273 let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
274 typed_df.with_column(typed_series).map_err(|err| {
275 Box::new(RunError(format!(
276 "failed to add missing column {}: {err}",
277 name
278 )))
279 })?;
280 }
281 Ok(())
282}
283
284fn add_missing_columns_typed(
285 typed_df: &mut DataFrame,
286 declared_columns: &[config::ColumnConfig],
287 missing: &[String],
288) -> FloeResult<()> {
289 let mut types = HashMap::new();
290 for column in declared_columns {
291 types.insert(
292 column.name.as_str(),
293 config::parse_data_type(&column.column_type)?,
294 );
295 }
296
297 let height = typed_df.height();
298 for name in missing {
299 let dtype = types
300 .get(name.as_str())
301 .cloned()
302 .unwrap_or(DataType::String);
303 let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
304 typed_df.with_column(typed_series).map_err(|err| {
305 Box::new(RunError(format!(
306 "failed to add missing column {}: {err}",
307 name
308 )))
309 })?;
310 }
311 Ok(())
312}
313
314fn drop_extra_columns(
315 raw_df: &mut DataFrame,
316 typed_df: &mut DataFrame,
317 extra: &[String],
318) -> FloeResult<()> {
319 for name in extra {
320 if raw_df.get_column_index(name).is_some() {
321 raw_df.drop_in_place(name).map_err(|err| {
322 Box::new(RunError(format!(
323 "failed to drop extra column {}: {err}",
324 name
325 )))
326 })?;
327 }
328 }
329
330 for name in extra {
331 if typed_df.get_column_index(name).is_some() {
332 typed_df.drop_in_place(name).map_err(|err| {
333 Box::new(RunError(format!(
334 "failed to drop extra column {}: {err}",
335 name
336 )))
337 })?;
338 }
339 }
340 Ok(())
341}
342
343fn drop_extra_columns_typed(typed_df: &mut DataFrame, extra: &[String]) -> FloeResult<()> {
344 for name in extra {
345 if typed_df.get_column_index(name).is_some() {
346 typed_df.drop_in_place(name).map_err(|err| {
347 Box::new(RunError(format!(
348 "failed to drop extra column {}: {err}",
349 name
350 )))
351 })?;
352 }
353 }
354 Ok(())
355}