1use std::collections::HashMap;
2
3use polars::prelude::{DataFrame, DataType, Series};
4
5use crate::errors::RunError;
6use crate::{config, report, FloeResult};
7
8const MAX_MISMATCH_COLUMNS: usize = 50;
9
10#[derive(Debug, Clone)]
11pub struct MismatchOutcome {
12 pub report: report::FileMismatch,
13 pub rejected: bool,
14 pub aborted: bool,
15 pub warnings: u64,
16 pub errors: u64,
17 pub missing: Vec<String>,
18 pub extra: Vec<String>,
19 pub fill_missing: bool,
20 pub ignore_extra: bool,
21}
22
23pub fn plan_schema_mismatch(
24 entity: &config::EntityConfig,
25 declared_columns: &[config::ColumnConfig],
26 input_names: &[String],
27) -> FloeResult<MismatchOutcome> {
28 let normalize_strategy = crate::run::normalize::resolve_normalize_strategy(entity)?;
29 let declared_names = declared_columns
30 .iter()
31 .map(|column| column.name.clone())
32 .collect::<Vec<_>>();
33 let input_names = match normalize_strategy.as_deref() {
34 Some(strategy) => input_names
35 .iter()
36 .map(|name| crate::run::normalize::normalize_name(name, strategy))
37 .collect::<Vec<_>>(),
38 None => input_names.to_vec(),
39 };
40
41 let declared_set = declared_names
42 .iter()
43 .cloned()
44 .collect::<std::collections::HashSet<_>>();
45 let input_set = input_names
46 .iter()
47 .cloned()
48 .collect::<std::collections::HashSet<_>>();
49
50 let mut missing = declared_names
51 .iter()
52 .filter(|name| !input_set.contains(*name))
53 .cloned()
54 .collect::<Vec<_>>();
55 let mut extra = input_names
56 .iter()
57 .filter(|name| !declared_set.contains(*name))
58 .cloned()
59 .collect::<Vec<_>>();
60 missing.sort();
61 extra.sort();
62
63 let mismatch_config = entity.schema.mismatch.as_ref();
64 let missing_policy = mismatch_config
65 .and_then(|mismatch| mismatch.missing_columns.as_deref())
66 .unwrap_or("fill_nulls");
67 let extra_policy = mismatch_config
68 .and_then(|mismatch| mismatch.extra_columns.as_deref())
69 .unwrap_or("ignore");
70
71 let mut effective_missing = missing_policy;
72 let mut effective_extra = extra_policy;
73 let mut warning = None;
74 let rejection_requested = (effective_missing == "reject_file" && !missing.is_empty())
75 || (effective_extra == "reject_file" && !extra.is_empty());
76 if rejection_requested && entity.policy.severity == "warn" {
77 warning = Some(format!(
78 "entity.name={} schema mismatch requested reject_file but policy.severity=warn; continuing",
79 entity.name
80 ));
81 effective_missing = "fill_nulls";
82 effective_extra = "ignore";
83 }
84
85 let mut rejected = false;
86 let mut aborted = false;
87 let mut action = report::MismatchAction::None;
88 if (effective_missing == "reject_file" && !missing.is_empty())
89 || (effective_extra == "reject_file" && !extra.is_empty())
90 {
91 if entity.policy.severity == "abort" {
92 aborted = true;
93 action = report::MismatchAction::Aborted;
94 } else if entity.policy.severity == "reject" {
95 rejected = true;
96 action = report::MismatchAction::RejectedFile;
97 }
98 }
99
100 let mut errors = 0;
101 let mut fill_missing = false;
102 let mut ignore_extra = false;
103 if rejected || aborted {
104 errors = 1;
105 } else {
106 if effective_missing == "fill_nulls" && !missing.is_empty() {
107 fill_missing = true;
108 action = report::MismatchAction::FilledNulls;
109 }
110 if effective_extra == "ignore" && !extra.is_empty() {
111 ignore_extra = true;
112 if !fill_missing {
113 action = report::MismatchAction::IgnoredExtras;
114 }
115 }
116 }
117
118 let warnings = if warning.is_some() { 1 } else { 0 };
119 let error = if rejected || aborted {
120 Some(report::MismatchIssue {
121 rule: "schema_mismatch".to_string(),
122 message: format!(
123 "entity.name={} schema mismatch: missing={} extra={}",
124 entity.name,
125 missing.len(),
126 extra.len()
127 ),
128 })
129 } else {
130 None
131 };
132
133 let mismatch_report = report::FileMismatch {
134 declared_columns_count: declared_names.len() as u64,
135 input_columns_count: input_names.len() as u64,
136 missing_columns: missing.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
137 extra_columns: extra.iter().take(MAX_MISMATCH_COLUMNS).cloned().collect(),
138 mismatch_action: action,
139 error,
140 warning,
141 };
142
143 Ok(MismatchOutcome {
144 report: mismatch_report,
145 rejected,
146 aborted,
147 warnings,
148 errors,
149 missing,
150 extra,
151 fill_missing,
152 ignore_extra,
153 })
154}
155
156pub fn apply_schema_mismatch(
157 entity: &config::EntityConfig,
158 declared_columns: &[config::ColumnConfig],
159 input_names: &[String],
160 raw_df: Option<&mut DataFrame>,
161 typed_df: &mut DataFrame,
162) -> FloeResult<MismatchOutcome> {
163 let plan = plan_schema_mismatch(entity, declared_columns, input_names)?;
164 if !plan.rejected && !plan.aborted {
165 apply_mismatch_plan(&plan, declared_columns, raw_df, typed_df)?;
166 }
167 Ok(plan)
168}
169
170pub fn apply_mismatch_plan(
171 plan: &MismatchOutcome,
172 declared_columns: &[config::ColumnConfig],
173 mut raw_df: Option<&mut DataFrame>,
174 typed_df: &mut DataFrame,
175) -> FloeResult<()> {
176 if plan.fill_missing {
177 if let Some(raw_df) = raw_df.as_mut() {
178 add_missing_columns(raw_df, typed_df, declared_columns, &plan.missing)?;
179 } else {
180 add_missing_columns_typed(typed_df, declared_columns, &plan.missing)?;
181 }
182 }
183 if plan.ignore_extra {
184 if let Some(raw_df) = raw_df.as_mut() {
185 drop_extra_columns(raw_df, typed_df, &plan.extra)?;
186 } else {
187 drop_extra_columns_typed(typed_df, &plan.extra)?;
188 }
189 }
190 Ok(())
191}
192
193fn add_missing_columns(
194 raw_df: &mut DataFrame,
195 typed_df: &mut DataFrame,
196 declared_columns: &[config::ColumnConfig],
197 missing: &[String],
198) -> FloeResult<()> {
199 let mut types = HashMap::new();
200 for column in declared_columns {
201 types.insert(
202 column.name.as_str(),
203 config::parse_data_type(&column.column_type)?,
204 );
205 }
206
207 let height = raw_df.height();
208 for name in missing {
209 let raw_series = Series::full_null(name.as_str().into(), height, &DataType::String);
210 raw_df.with_column(raw_series).map_err(|err| {
211 Box::new(RunError(format!(
212 "failed to add missing column {}: {err}",
213 name
214 )))
215 })?;
216
217 let dtype = types
218 .get(name.as_str())
219 .cloned()
220 .unwrap_or(DataType::String);
221 let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
222 typed_df.with_column(typed_series).map_err(|err| {
223 Box::new(RunError(format!(
224 "failed to add missing column {}: {err}",
225 name
226 )))
227 })?;
228 }
229 Ok(())
230}
231
232fn add_missing_columns_typed(
233 typed_df: &mut DataFrame,
234 declared_columns: &[config::ColumnConfig],
235 missing: &[String],
236) -> FloeResult<()> {
237 let mut types = HashMap::new();
238 for column in declared_columns {
239 types.insert(
240 column.name.as_str(),
241 config::parse_data_type(&column.column_type)?,
242 );
243 }
244
245 let height = typed_df.height();
246 for name in missing {
247 let dtype = types
248 .get(name.as_str())
249 .cloned()
250 .unwrap_or(DataType::String);
251 let typed_series = Series::full_null(name.as_str().into(), height, &dtype);
252 typed_df.with_column(typed_series).map_err(|err| {
253 Box::new(RunError(format!(
254 "failed to add missing column {}: {err}",
255 name
256 )))
257 })?;
258 }
259 Ok(())
260}
261
262fn drop_extra_columns(
263 raw_df: &mut DataFrame,
264 typed_df: &mut DataFrame,
265 extra: &[String],
266) -> FloeResult<()> {
267 for name in extra {
268 if raw_df.get_column_index(name).is_some() {
269 raw_df.drop_in_place(name).map_err(|err| {
270 Box::new(RunError(format!(
271 "failed to drop extra column {}: {err}",
272 name
273 )))
274 })?;
275 }
276 }
277
278 for name in extra {
279 if typed_df.get_column_index(name).is_some() {
280 typed_df.drop_in_place(name).map_err(|err| {
281 Box::new(RunError(format!(
282 "failed to drop extra column {}: {err}",
283 name
284 )))
285 })?;
286 }
287 }
288 Ok(())
289}
290
291fn drop_extra_columns_typed(typed_df: &mut DataFrame, extra: &[String]) -> FloeResult<()> {
292 for name in extra {
293 if typed_df.get_column_index(name).is_some() {
294 typed_df.drop_in_place(name).map_err(|err| {
295 Box::new(RunError(format!(
296 "failed to drop extra column {}: {err}",
297 name
298 )))
299 })?;
300 }
301 }
302 Ok(())
303}