1mod cast;
2mod mismatch;
3pub mod normalize;
4mod not_null;
5mod unique;
6
7use polars::prelude::{
8 BooleanChunked, ChunkFull, DataFrame, Expr, IntoLazy, IntoSeries, NamedFrom, NewChunkedArray,
9 Series,
10};
11use std::collections::{BTreeMap, HashMap};
12
13use crate::{ConfigError, FloeResult};
14
15pub use cast::{
16 cast_mismatch_counts, cast_mismatch_errors, cast_mismatch_errors_sparse, cast_mismatch_expr,
17};
18pub use mismatch::{
19 apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, resolve_mismatch_columns,
20 top_level_declared_columns, MismatchOutcome,
21};
22pub use not_null::{not_null_counts, not_null_errors, not_null_errors_sparse, not_null_expr};
23pub use unique::{
24 resolve_schema_unique_keys, unique_counts, unique_errors, unique_errors_sparse,
25 UniqueConstraint, UniqueConstraintResult, UniqueTracker,
26};
27
28pub type ColumnIndex = HashMap<String, usize>;
29
30pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
31 df.get_column_names()
32 .iter()
33 .enumerate()
34 .map(|(idx, name)| (name.to_string(), idx))
35 .collect()
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct RowError {
40 pub rule: String,
41 pub column: String,
42 pub message: String,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct SparseRowErrors {
47 row_count: usize,
48 rows: BTreeMap<usize, Vec<RowError>>,
49}
50
51impl SparseRowErrors {
52 pub fn new(row_count: usize) -> Self {
53 Self {
54 row_count,
55 rows: BTreeMap::new(),
56 }
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.rows.is_empty()
61 }
62
63 pub fn add_error(&mut self, row_idx: usize, error: RowError) {
64 self.rows.entry(row_idx).or_default().push(error);
65 }
66
67 pub fn add_errors(&mut self, row_idx: usize, errors: Vec<RowError>) {
68 if errors.is_empty() {
69 return;
70 }
71 self.rows.entry(row_idx).or_default().extend(errors);
72 }
73
74 pub fn merge(&mut self, other: SparseRowErrors) {
75 for (row_idx, errors) in other.rows {
76 self.add_errors(row_idx, errors);
77 }
78 }
79
80 pub fn accept_rows(&self) -> Vec<bool> {
81 let mut accept_rows = vec![true; self.row_count];
82 for row_idx in self.rows.keys() {
83 if let Some(slot) = accept_rows.get_mut(*row_idx) {
84 *slot = false;
85 }
86 }
87 accept_rows
88 }
89
90 pub fn build_errors_formatted(&self, formatter: &dyn RowErrorFormatter) -> Vec<Option<String>> {
91 let mut errors_out = vec![None; self.row_count];
92 for (row_idx, errors) in &self.rows {
93 if let Some(slot) = errors_out.get_mut(*row_idx) {
94 *slot = Some(formatter.format(errors));
95 }
96 }
97 errors_out
98 }
99
100 pub fn iter(&self) -> impl Iterator<Item = (&usize, &Vec<RowError>)> {
101 self.rows.iter()
102 }
103
104 pub fn get(&self, row_idx: usize) -> Option<&Vec<RowError>> {
105 self.rows.get(&row_idx)
106 }
107
108 pub fn error_row_count(&self) -> u64 {
109 self.rows.len() as u64
110 }
111
112 pub fn violation_count(&self) -> u64 {
113 self.rows.values().map(|errors| errors.len() as u64).sum()
114 }
115}
116
117impl RowError {
118 pub fn new(rule: &str, column: &str, message: &str) -> Self {
119 Self {
120 rule: rule.to_string(),
121 column: column.to_string(),
122 message: message.to_string(),
123 }
124 }
125
126 pub fn to_json(&self) -> String {
127 self.to_json_with_source(None)
128 }
129
130 pub fn to_json_with_source(&self, source: Option<&str>) -> String {
131 match source {
132 Some(source) => format!(
133 "{{\"rule\":\"{}\",\"column\":\"{}\",\"source\":\"{}\",\"message\":\"{}\"}}",
134 json_escape(&self.rule),
135 json_escape(&self.column),
136 json_escape(source),
137 json_escape(&self.message)
138 ),
139 None => format!(
140 "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
141 json_escape(&self.rule),
142 json_escape(&self.column),
143 json_escape(&self.message)
144 ),
145 }
146 }
147}
148
149pub trait RowErrorFormatter {
150 fn format(&self, errors: &[RowError]) -> String;
151}
152
153#[derive(Default)]
154pub struct JsonRowErrorFormatter {
155 source_map: Option<HashMap<String, String>>,
156}
157#[derive(Default)]
158pub struct CsvRowErrorFormatter {
159 source_map: Option<HashMap<String, String>>,
160}
161#[derive(Default)]
162pub struct TextRowErrorFormatter {
163 source_map: Option<HashMap<String, String>>,
164}
165
166impl RowErrorFormatter for JsonRowErrorFormatter {
167 fn format(&self, errors: &[RowError]) -> String {
168 let json_items = errors
169 .iter()
170 .map(|error| {
171 let source = self
172 .source_map
173 .as_ref()
174 .and_then(|map| map.get(&error.column).map(|value| value.as_str()));
175 error.to_json_with_source(source)
176 })
177 .collect::<Vec<_>>()
178 .join(",");
179 format!("[{}]", json_items)
180 }
181}
182
183impl RowErrorFormatter for CsvRowErrorFormatter {
184 fn format(&self, errors: &[RowError]) -> String {
185 let lines = errors
186 .iter()
187 .map(|error| {
188 if let Some(source_map) = self.source_map.as_ref() {
189 let source = source_map
190 .get(&error.column)
191 .map(|value| value.as_str())
192 .unwrap_or("");
193 format!(
194 "{},{},{},{}",
195 csv_escape(&error.rule),
196 csv_escape(&error.column),
197 csv_escape(source),
198 csv_escape(&error.message)
199 )
200 } else {
201 format!(
202 "{},{},{}",
203 csv_escape(&error.rule),
204 csv_escape(&error.column),
205 csv_escape(&error.message)
206 )
207 }
208 })
209 .collect::<Vec<_>>()
210 .join("\n");
211 json_string(&lines)
212 }
213}
214
215impl RowErrorFormatter for TextRowErrorFormatter {
216 fn format(&self, errors: &[RowError]) -> String {
217 let text = errors
218 .iter()
219 .map(|error| {
220 if let Some(source_map) = self.source_map.as_ref() {
221 if let Some(source) = source_map.get(&error.column) {
222 return format!(
223 "{}:{} source={} {}",
224 error.rule, error.column, source, error.message
225 );
226 }
227 }
228 format!("{}:{} {}", error.rule, error.column, error.message)
229 })
230 .collect::<Vec<_>>()
231 .join("; ");
232 json_string(&text)
233 }
234}
235
236pub fn row_error_formatter(
237 name: &str,
238 source_map: Option<&HashMap<String, String>>,
239) -> FloeResult<Box<dyn RowErrorFormatter>> {
240 match name {
241 "json" => Ok(Box::new(JsonRowErrorFormatter {
242 source_map: source_map.cloned(),
243 })),
244 "csv" => Ok(Box::new(CsvRowErrorFormatter {
245 source_map: source_map.cloned(),
246 })),
247 "text" => Ok(Box::new(TextRowErrorFormatter {
248 source_map: source_map.cloned(),
249 })),
250 other => Err(Box::new(ConfigError(format!(
251 "unsupported report.formatter: {other}"
252 )))),
253 }
254}
255
256pub struct ExprCheckResult {
258 pub accept_mask: BooleanChunked,
260 pub col_masks: Vec<(String, BooleanChunked)>,
263 pub col_violation_counts: Vec<(String, u64)>,
265}
266
267impl ExprCheckResult {
268 pub fn all_accepted(height: usize) -> Self {
269 Self {
270 accept_mask: BooleanChunked::full("floe_accept".into(), true, height),
271 col_masks: Vec::new(),
272 col_violation_counts: Vec::new(),
273 }
274 }
275
276 pub fn total_violations(&self) -> u64 {
277 self.col_violation_counts.iter().map(|(_, c)| c).sum()
278 }
279}
280
281pub fn run_expr_checks(
290 df: &DataFrame,
291 raw_df: &DataFrame,
292 required_cols: &[String],
293 columns: &[crate::config::ColumnConfig],
294 track_cast: bool,
295) -> FloeResult<ExprCheckResult> {
296 let mut err_col_names: Vec<String> = Vec::new();
297
298 let not_null_exprs: Vec<Expr> = required_cols
300 .iter()
301 .map(|col_name| {
302 let (err_col, expr) = not_null::not_null_expr(col_name);
303 err_col_names.push(err_col);
304 expr
305 })
306 .collect();
307
308 let mut checked = if not_null_exprs.is_empty() {
309 df.clone()
310 } else {
311 df.clone()
312 .lazy()
313 .with_columns(not_null_exprs)
314 .collect()
315 .map_err(|e| {
316 Box::new(crate::errors::RunError(format!(
317 "run_expr_checks: not_null evaluation failed: {e}"
318 ))) as Box<dyn std::error::Error + Send + Sync>
319 })?
320 };
321
322 if track_cast {
325 for c in columns {
326 if cast::is_string_type(&c.column_type) {
327 continue;
328 }
329 let raw_not_null = raw_df
330 .column(&c.name)
331 .map_err(|e| {
332 Box::new(crate::errors::RunError(format!(
333 "run_expr_checks: raw column '{}' not found: {e}",
334 c.name
335 ))) as Box<dyn std::error::Error + Send + Sync>
336 })?
337 .is_not_null();
338
339 let typed_null = df
340 .column(&c.name)
341 .map_err(|e| {
342 Box::new(crate::errors::RunError(format!(
343 "run_expr_checks: typed column '{}' not found: {e}",
344 c.name
345 ))) as Box<dyn std::error::Error + Send + Sync>
346 })?
347 .is_null();
348
349 let error_mask = &typed_null & &raw_not_null;
350 let err_col_name = format!("_e_cast_{}", c.name);
351 let error_json =
352 RowError::new("cast_error", &c.name, "invalid value for target type").to_json();
353
354 let cast_err_series = bool_mask_to_error_series(&err_col_name, error_mask, &error_json);
355 checked.with_column(cast_err_series).map_err(|e| {
356 Box::new(crate::errors::RunError(format!(
357 "run_expr_checks: could not attach cast error column '{}': {e}",
358 err_col_name
359 ))) as Box<dyn std::error::Error + Send + Sync>
360 })?;
361 err_col_names.push(err_col_name);
362 }
363 }
364
365 if err_col_names.is_empty() {
366 return Ok(ExprCheckResult::all_accepted(df.height()));
367 }
368
369 let mut accept_mask = BooleanChunked::full("floe_accept".into(), true, checked.height());
370 let mut col_masks: Vec<(String, BooleanChunked)> = Vec::with_capacity(err_col_names.len());
371 let mut col_violation_counts: Vec<(String, u64)> = Vec::with_capacity(err_col_names.len());
372
373 for err_col in &err_col_names {
374 let col = checked.column(err_col).map_err(|e| {
375 Box::new(crate::errors::RunError(format!(
376 "run_expr_checks: error column '{err_col}' missing after eval: {e}"
377 ))) as Box<dyn std::error::Error + Send + Sync>
378 })?;
379 let null_mask = col.is_null();
380 accept_mask = &accept_mask & &null_mask;
381 let violations = (col.len() - col.null_count()) as u64;
382 col_violation_counts.push((err_col.clone(), violations));
383 col_masks.push((err_col.clone(), null_mask));
384 }
385
386 Ok(ExprCheckResult {
387 accept_mask,
388 col_masks,
389 col_violation_counts,
390 })
391}
392
393fn bool_mask_to_error_series(
396 col_name: &str,
397 error_mask: BooleanChunked,
398 error_json: &str,
399) -> Series {
400 use polars::prelude::StringChunked;
401 let ca: StringChunked = error_mask
402 .into_iter()
403 .map(|opt_b| {
404 if opt_b == Some(true) {
405 Some(error_json)
406 } else {
407 None
408 }
409 })
410 .collect();
411 ca.with_name(col_name.into()).into_series()
412}
413
414pub fn build_errors_formatted_expr(
418 height: usize,
419 accept_mask: &BooleanChunked,
420 col_masks: &[(String, BooleanChunked)],
421 unique_errors: &SparseRowErrors,
422 formatter: &dyn RowErrorFormatter,
423) -> Vec<Option<String>> {
424 let mut out = vec![None; height];
425 for (row_idx, slot) in out.iter_mut().enumerate() {
426 if accept_mask.get(row_idx) == Some(true) {
427 continue;
428 }
429 let mut row_errors: Vec<RowError> = Vec::new();
430 for (err_col_name, null_mask) in col_masks {
431 if null_mask.get(row_idx) == Some(false) {
432 if let Some(col) = err_col_name.strip_prefix("_e_nn_") {
433 row_errors.push(RowError::new("not_null", col, "required value missing"));
434 } else if let Some(col) = err_col_name.strip_prefix("_e_cast_") {
435 row_errors.push(RowError::new(
436 "cast_error",
437 col,
438 "invalid value for target type",
439 ));
440 }
441 }
442 }
443 if let Some(unique_row_errors) = unique_errors.get(row_idx) {
444 row_errors.extend(unique_row_errors.iter().cloned());
445 }
446 if !row_errors.is_empty() {
447 *slot = Some(formatter.format(&row_errors));
448 }
449 }
450 out
451}
452
453pub fn accept_mask_from_error_cols(
454 df: &DataFrame,
455 err_cols: &[&str],
456) -> FloeResult<BooleanChunked> {
457 let mut accept_mask = BooleanChunked::full("floe_accept".into(), true, df.height());
458 for err_col in err_cols {
459 let errors = df.column(err_col).map_err(|err| {
460 Box::new(ConfigError(format!(
461 "error column {err_col} not found: {err}"
462 )))
463 })?;
464 let no_error = errors.is_null();
465 accept_mask = &accept_mask & &no_error;
466 }
467 Ok(accept_mask)
468}
469
470pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
471 let mut accept_rows = Vec::with_capacity(errors_per_row.len());
472 for errors in errors_per_row {
473 accept_rows.push(errors.is_empty());
474 }
475 accept_rows
476}
477
478pub fn build_errors_json(
479 errors_per_row: &[Vec<RowError>],
480 accept_rows: &[bool],
481) -> Vec<Option<String>> {
482 let formatter = JsonRowErrorFormatter { source_map: None };
483 build_errors_formatted(errors_per_row, accept_rows, &formatter)
484}
485
486pub fn build_errors_formatted(
487 errors_per_row: &[Vec<RowError>],
488 accept_rows: &[bool],
489 formatter: &dyn RowErrorFormatter,
490) -> Vec<Option<String>> {
491 let mut errors_out = Vec::with_capacity(errors_per_row.len());
492 for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
493 if *accepted {
494 errors_out.push(None);
495 continue;
496 }
497 errors_out.push(Some(formatter.format(errors)));
498 }
499 errors_out
500}
501
502pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
503 let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
504 let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
505 let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
506 (accept_mask, reject_mask)
507}
508
509pub fn rejected_error_columns(
510 errors_per_row: &[Option<String>],
511 include_all_rows: bool,
512) -> (Series, Series) {
513 if include_all_rows {
514 let mut row_index = Vec::with_capacity(errors_per_row.len());
515 let mut errors = Vec::with_capacity(errors_per_row.len());
516 for (idx, err) in errors_per_row.iter().enumerate() {
517 row_index.push(idx as u64);
518 errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
519 }
520 (
521 Series::new("__floe_row_index".into(), row_index),
522 Series::new("__floe_errors".into(), errors),
523 )
524 } else {
525 let mut row_index = Vec::new();
526 let mut errors = Vec::new();
527 for (idx, err) in errors_per_row.iter().enumerate() {
528 if let Some(err) = err {
529 row_index.push(idx as u64);
530 errors.push(err.clone());
531 }
532 }
533 (
534 Series::new("__floe_row_index".into(), row_index),
535 Series::new("__floe_errors".into(), errors),
536 )
537 }
538}
539
540fn json_escape(value: &str) -> String {
541 value
542 .replace('\\', "\\\\")
543 .replace('\"', "\\\"")
544 .replace('\n', "\\n")
545 .replace('\r', "\\r")
546 .replace('\t', "\\t")
547}
548
549fn json_string(value: &str) -> String {
550 format!("\"{}\"", json_escape(value))
551}
552
553fn csv_escape(value: &str) -> String {
554 let escaped = value.replace('"', "\"\"");
555 if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
556 format!("\"{}\"", escaped)
557 } else {
558 escaped
559 }
560}