1use std::collections::HashMap;
2
3use polars::prelude::{DataFrame, DataType, Series};
4
5use crate::errors::RunError;
6use crate::{config, report, ConfigError, FloeResult};
7
8const MAX_MISMATCH_COLUMNS: usize = 50;
9
10#[derive(Debug, Clone)]
11pub struct MismatchOutcome {
12 pub report: report::FileMismatch,
13 pub rejected: bool,
14 pub aborted: bool,
15 pub warnings: u64,
16 pub errors: u64,
17 pub missing: Vec<String>,
18 pub extra: Vec<String>,
19 pub fill_missing: bool,
20 pub ignore_extra: bool,
21}
22
23pub fn top_level_declared_columns(
24 columns: &[config::ColumnConfig],
25 normalize_strategy: Option<&str>,
26) -> FloeResult<Vec<config::ColumnConfig>> {
27 let mut resolved = Vec::new();
28 let mut seen = std::collections::HashSet::new();
29 for column in columns {
30 let source = column.source_or_name();
31 if source.contains('.')
32 || source.contains('[')
33 || source.contains('/')
34 || source.contains('@')
35 {
36 continue;
37 }
38 let normalized = if let Some(strategy) = normalize_strategy {
39 crate::checks::normalize::normalize_name(source, strategy)
40 } else {
41 source.to_string()
42 };
43 if !seen.insert(normalized.clone()) {
44 return Err(Box::new(ConfigError(format!(
45 "duplicate top-level column selector: {}",
46 normalized
47 ))));
48 }
49 resolved.push(config::ColumnConfig {
50 name: normalized,
51 source: None,
52 column_type: column.column_type.clone(),
53 nullable: column.nullable,
54 unique: column.unique,
55 width: column.width,
56 trim: column.trim,
57 });
58 }
59 Ok(resolved)
60}
61
62pub fn resolve_mismatch_columns(
63 entity: &config::EntityConfig,
64 normalized_columns: &[config::ColumnConfig],
65 normalize_strategy: Option<&str>,
66) -> FloeResult<Vec<config::ColumnConfig>> {
67 if entity.source.format == "json" || entity.source.format == "xml" {
68 top_level_declared_columns(&entity.schema.columns, normalize_strategy)
69 } else {
70 Ok(normalized_columns.to_vec())
71 }
72}
73
74pub fn plan_schema_mismatch(
75 entity: &config::EntityConfig,
76 declared_columns: &[config::ColumnConfig],
77 input_names: &[String],
78) -> FloeResult<MismatchOutcome> {
79 let normalize_strategy = crate::checks::normalize::resolve_normalize_strategy(entity)?;
80 let declared_names = declared_columns
81 .iter()
82 .map(|column| column.name.clone())
83 .collect::<Vec<_>>();
84 let input_names = match normalize_strategy.as_deref() {
85 Some(strategy) => input_names
86 .iter()
87 .map(|name| crate::checks::normalize::normalize_name(name, strategy))
88 .collect::<Vec<_>>(),
89 None => input_names.to_vec(),
90 };
91
92 let declared_set = declared_names
93 .iter()
94 .cloned()
95 .collect::<std::collections::HashSet<_>>();
96 let input_set = input_names
97 .iter()
98 .cloned()
99 .collect::<std::collections::HashSet<_>>();
100
101 let mut missing = declared_names
102 .iter()
103 .filter(|name| !input_set.contains(*name))
104 .cloned()
105 .collect::<Vec<_>>();
106 let mut extra = input_names
107 .iter()
108 .filter(|name| !declared_set.contains(*name))
109 .cloned()
110 .collect::<Vec<_>>();
111 missing.sort();
112 extra.sort();
113
114 let mismatch_config = entity.schema.mismatch.as_ref();
115 let missing_policy = mismatch_config
116 .and_then(|mismatch| mismatch.missing_columns.as_deref())
117 .unwrap_or("fill_nulls");
118 let extra_policy = mismatch_config
119 .and_then(|mismatch| mismatch.extra_columns.as_deref())
120 .unwrap_or("ignore");
121
122 let mut effective_missing = missing_policy;
123 let mut effective_extra = extra_policy;
124 let mut warning = None;
125 let rejection_requested = (effective_missing == "reject_file" && !missing.is_empty())
126 || (effective_extra == "reject_file" && !extra.is_empty());
127 if rejection_requested && entity.policy.severity == "warn" {
128 warning = Some(format!(
129 "entity.name={} schema mismatch requested reject_file but policy.severity=warn; continuing",
130 entity.name
131 ));
132 effective_missing = "fill_nulls";
133 effective_extra = "ignore";
134 }
135
136 let mut rejected = false;
137 let mut aborted = false;
138 let mut action = report::MismatchAction::None;
139 if (effective_missing == "reject_file" && !missing.is_empty())
140 || (effective_extra == "reject_file" && !extra.is_empty())
141 {
142 if entity.policy.severity == "abort" {
143 aborted = true;
144 action = report::MismatchAction::Aborted;
145 } else if entity.policy.severity == "reject" {
146 rejected = true;
147 action = report::MismatchAction::RejectedFile;
148 }
149 }
150
151 let mut errors = 0;
152 let mut fill_missing = false;
153 let mut ignore_extra = false;
154 if rejected || aborted {
155 errors = 1;
156 } else {
157 if effective_missing == "fill_nulls" && !missing.is_empty() {
158 fill_missing = true;
159 action = report::MismatchAction::FilledNulls;
160 }
161 if effective_extra == "ignore" && !extra.is_empty() {
162 ignore_extra = true;
163 if !fill_missing {
164 action = report::MismatchAction::IgnoredExtras;
165 }
166 }
167 }
168
169 let warnings = if warning.is_some() { 1 } else { 0 };
170 let error = if rejected || aborted {
171 Some(report::MismatchIssue {
172 rule: "schema_mismatch".to_string(),
173 message: format!(
174 "entity.name={} schema mismatch: missing={} extra={}",
175 entity.name,
176 missing.len(),
177 extra.len()
178 ),
179 })
180 } else {
181 None
182 };
183
184 let mismatch_report = report::FileMismatch {
185 declared_columns_count: declared_names.len() as u64,
186 input_columns_count: input_names.len() as u64,
187 missing_columns: missing.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
188 extra_columns: extra.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
189 mismatch_action: action,
190 error,
191 warning,
192 };
193
194 Ok(MismatchOutcome {
195 report: mismatch_report,
196 rejected,
197 aborted,
198 warnings,
199 errors,
200 missing,
201 extra,
202 fill_missing,
203 ignore_extra,
204 })
205}
206
207pub fn apply_schema_mismatch(
208 entity: &config::EntityConfig,
209 declared_columns: &[config::ColumnConfig],
210 input_names: &[String],
211 raw_df: Option<&mut DataFrame>,
212 typed_df: &mut DataFrame,
213) -> FloeResult<MismatchOutcome> {
214 let plan = plan_schema_mismatch(entity, declared_columns, input_names)?;
215 if !plan.rejected && !plan.aborted {
216 apply_mismatch_plan(&plan, declared_columns, raw_df, typed_df)?;
217 }
218 Ok(plan)
219}
220
221pub fn apply_mismatch_plan(
222 plan: &MismatchOutcome,
223 declared_columns: &[config::ColumnConfig],
224 mut raw_df: Option<&mut DataFrame>,
225 typed_df: &mut DataFrame,
226) -> FloeResult<()> {
227 if plan.fill_missing {
228 if let Some(raw_df) = raw_df.as_mut() {
229 add_missing_columns(raw_df, typed_df, declared_columns, &plan.missing)?;
230 } else {
231 add_missing_columns_typed(typed_df, declared_columns, &plan.missing)?;
232 }
233 }
234 if plan.ignore_extra {
235 if let Some(raw_df) = raw_df.as_mut() {
236 drop_extra_columns(raw_df, typed_df, &plan.extra)?;
237 } else {
238 drop_extra_columns_typed(typed_df, &plan.extra)?;
239 }
240 }
241 Ok(())
242}
243
244fn add_missing_columns(
245 raw_df: &mut DataFrame,
246 typed_df: &mut DataFrame,
247 declared_columns: &[config::ColumnConfig],
248 missing: &[String],
249) -> FloeResult<()> {
250 let mut types = HashMap::new();
251 for column in declared_columns {
252 types.insert(
253 column.name.as_str(),
254 config::parse_data_type(&column.column_type)?,
255 );
256 }
257
258 let height = raw_df.height();
259 for name in missing {
260 let raw_series = Series::full_null(name.as_str().into(), height, &DataType::String);
261 raw_df.with_column(raw_series).map_err(|err| {
262 Box::new(RunError(format!(
263 "failed to add missing column {}: {err}",
264 name
265 )))
266 })?;
267
268 let dtype = types
269 .get(name.as_str())
270 .cloned()
271 .unwrap_or(DataType::String);
272 let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
273 typed_df.with_column(typed_series).map_err(|err| {
274 Box::new(RunError(format!(
275 "failed to add missing column {}: {err}",
276 name
277 )))
278 })?;
279 }
280 Ok(())
281}
282
283fn add_missing_columns_typed(
284 typed_df: &mut DataFrame,
285 declared_columns: &[config::ColumnConfig],
286 missing: &[String],
287) -> FloeResult<()> {
288 let mut types = HashMap::new();
289 for column in declared_columns {
290 types.insert(
291 column.name.as_str(),
292 config::parse_data_type(&column.column_type)?,
293 );
294 }
295
296 let height = typed_df.height();
297 for name in missing {
298 let dtype = types
299 .get(name.as_str())
300 .cloned()
301 .unwrap_or(DataType::String);
302 let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
303 typed_df.with_column(typed_series).map_err(|err| {
304 Box::new(RunError(format!(
305 "failed to add missing column {}: {err}",
306 name
307 )))
308 })?;
309 }
310 Ok(())
311}
312
313fn drop_extra_columns(
314 raw_df: &mut DataFrame,
315 typed_df: &mut DataFrame,
316 extra: &[String],
317) -> FloeResult<()> {
318 for name in extra {
319 if raw_df.get_column_index(name).is_some() {
320 raw_df.drop_in_place(name).map_err(|err| {
321 Box::new(RunError(format!(
322 "failed to drop extra column {}: {err}",
323 name
324 )))
325 })?;
326 }
327 }
328
329 for name in extra {
330 if typed_df.get_column_index(name).is_some() {
331 typed_df.drop_in_place(name).map_err(|err| {
332 Box::new(RunError(format!(
333 "failed to drop extra column {}: {err}",
334 name
335 )))
336 })?;
337 }
338 }
339 Ok(())
340}
341
342fn drop_extra_columns_typed(typed_df: &mut DataFrame, extra: &[String]) -> FloeResult<()> {
343 for name in extra {
344 if typed_df.get_column_index(name).is_some() {
345 typed_df.drop_in_place(name).map_err(|err| {
346 Box::new(RunError(format!(
347 "failed to drop extra column {}: {err}",
348 name
349 )))
350 })?;
351 }
352 }
353 Ok(())
354}