Skip to main content

floe_core/checks/
mod.rs

1mod cast;
2mod mismatch;
3pub mod normalize;
4mod not_null;
5mod unique;
6
7use polars::prelude::{
8    BooleanChunked, ChunkFull, DataFrame, Expr, IntoLazy, IntoSeries, NamedFrom, NewChunkedArray,
9    Series,
10};
11use std::collections::{BTreeMap, HashMap};
12
13use crate::{ConfigError, FloeResult};
14
15pub use cast::{
16    cast_mismatch_counts, cast_mismatch_errors, cast_mismatch_errors_sparse, cast_mismatch_expr,
17};
18pub use mismatch::{
19    apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, resolve_mismatch_columns,
20    top_level_declared_columns, MismatchOutcome,
21};
22pub use not_null::{not_null_counts, not_null_errors, not_null_errors_sparse, not_null_expr};
23pub use unique::{
24    resolve_schema_unique_keys, unique_counts, unique_errors, unique_errors_sparse,
25    UniqueConstraint, UniqueConstraintResult, UniqueTracker,
26};
27
28pub type ColumnIndex = HashMap<String, usize>;
29
30pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
31    df.get_column_names()
32        .iter()
33        .enumerate()
34        .map(|(idx, name)| (name.to_string(), idx))
35        .collect()
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct RowError {
40    pub rule: String,
41    pub column: String,
42    pub message: String,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct SparseRowErrors {
47    row_count: usize,
48    rows: BTreeMap<usize, Vec<RowError>>,
49}
50
51impl SparseRowErrors {
52    pub fn new(row_count: usize) -> Self {
53        Self {
54            row_count,
55            rows: BTreeMap::new(),
56        }
57    }
58
59    pub fn is_empty(&self) -> bool {
60        self.rows.is_empty()
61    }
62
63    pub fn add_error(&mut self, row_idx: usize, error: RowError) {
64        self.rows.entry(row_idx).or_default().push(error);
65    }
66
67    pub fn add_errors(&mut self, row_idx: usize, errors: Vec<RowError>) {
68        if errors.is_empty() {
69            return;
70        }
71        self.rows.entry(row_idx).or_default().extend(errors);
72    }
73
74    pub fn merge(&mut self, other: SparseRowErrors) {
75        for (row_idx, errors) in other.rows {
76            self.add_errors(row_idx, errors);
77        }
78    }
79
80    pub fn accept_rows(&self) -> Vec<bool> {
81        let mut accept_rows = vec![true; self.row_count];
82        for row_idx in self.rows.keys() {
83            if let Some(slot) = accept_rows.get_mut(*row_idx) {
84                *slot = false;
85            }
86        }
87        accept_rows
88    }
89
90    pub fn build_errors_formatted(&self, formatter: &dyn RowErrorFormatter) -> Vec<Option<String>> {
91        let mut errors_out = vec![None; self.row_count];
92        for (row_idx, errors) in &self.rows {
93            if let Some(slot) = errors_out.get_mut(*row_idx) {
94                *slot = Some(formatter.format(errors));
95            }
96        }
97        errors_out
98    }
99
100    pub fn iter(&self) -> impl Iterator<Item = (&usize, &Vec<RowError>)> {
101        self.rows.iter()
102    }
103
104    pub fn get(&self, row_idx: usize) -> Option<&Vec<RowError>> {
105        self.rows.get(&row_idx)
106    }
107
108    pub fn error_row_count(&self) -> u64 {
109        self.rows.len() as u64
110    }
111
112    pub fn violation_count(&self) -> u64 {
113        self.rows.values().map(|errors| errors.len() as u64).sum()
114    }
115}
116
117impl RowError {
118    pub fn new(rule: &str, column: &str, message: &str) -> Self {
119        Self {
120            rule: rule.to_string(),
121            column: column.to_string(),
122            message: message.to_string(),
123        }
124    }
125
126    pub fn to_json(&self) -> String {
127        self.to_json_with_source(None)
128    }
129
130    pub fn to_json_with_source(&self, source: Option<&str>) -> String {
131        match source {
132            Some(source) => format!(
133                "{{\"rule\":\"{}\",\"column\":\"{}\",\"source\":\"{}\",\"message\":\"{}\"}}",
134                json_escape(&self.rule),
135                json_escape(&self.column),
136                json_escape(source),
137                json_escape(&self.message)
138            ),
139            None => format!(
140                "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
141                json_escape(&self.rule),
142                json_escape(&self.column),
143                json_escape(&self.message)
144            ),
145        }
146    }
147}
148
149pub trait RowErrorFormatter {
150    fn format(&self, errors: &[RowError]) -> String;
151}
152
153#[derive(Default)]
154pub struct JsonRowErrorFormatter {
155    source_map: Option<HashMap<String, String>>,
156}
157#[derive(Default)]
158pub struct CsvRowErrorFormatter {
159    source_map: Option<HashMap<String, String>>,
160}
161#[derive(Default)]
162pub struct TextRowErrorFormatter {
163    source_map: Option<HashMap<String, String>>,
164}
165
166impl RowErrorFormatter for JsonRowErrorFormatter {
167    fn format(&self, errors: &[RowError]) -> String {
168        let json_items = errors
169            .iter()
170            .map(|error| {
171                let source = self
172                    .source_map
173                    .as_ref()
174                    .and_then(|map| map.get(&error.column).map(|value| value.as_str()));
175                error.to_json_with_source(source)
176            })
177            .collect::<Vec<_>>()
178            .join(",");
179        format!("[{}]", json_items)
180    }
181}
182
183impl RowErrorFormatter for CsvRowErrorFormatter {
184    fn format(&self, errors: &[RowError]) -> String {
185        let lines = errors
186            .iter()
187            .map(|error| {
188                if let Some(source_map) = self.source_map.as_ref() {
189                    let source = source_map
190                        .get(&error.column)
191                        .map(|value| value.as_str())
192                        .unwrap_or("");
193                    format!(
194                        "{},{},{},{}",
195                        csv_escape(&error.rule),
196                        csv_escape(&error.column),
197                        csv_escape(source),
198                        csv_escape(&error.message)
199                    )
200                } else {
201                    format!(
202                        "{},{},{}",
203                        csv_escape(&error.rule),
204                        csv_escape(&error.column),
205                        csv_escape(&error.message)
206                    )
207                }
208            })
209            .collect::<Vec<_>>()
210            .join("\n");
211        json_string(&lines)
212    }
213}
214
215impl RowErrorFormatter for TextRowErrorFormatter {
216    fn format(&self, errors: &[RowError]) -> String {
217        let text = errors
218            .iter()
219            .map(|error| {
220                if let Some(source_map) = self.source_map.as_ref() {
221                    if let Some(source) = source_map.get(&error.column) {
222                        return format!(
223                            "{}:{} source={} {}",
224                            error.rule, error.column, source, error.message
225                        );
226                    }
227                }
228                format!("{}:{} {}", error.rule, error.column, error.message)
229            })
230            .collect::<Vec<_>>()
231            .join("; ");
232        json_string(&text)
233    }
234}
235
236pub fn row_error_formatter(
237    name: &str,
238    source_map: Option<&HashMap<String, String>>,
239) -> FloeResult<Box<dyn RowErrorFormatter>> {
240    match name {
241        "json" => Ok(Box::new(JsonRowErrorFormatter {
242            source_map: source_map.cloned(),
243        })),
244        "csv" => Ok(Box::new(CsvRowErrorFormatter {
245            source_map: source_map.cloned(),
246        })),
247        "text" => Ok(Box::new(TextRowErrorFormatter {
248            source_map: source_map.cloned(),
249        })),
250        other => Err(Box::new(ConfigError(format!(
251            "unsupported report.formatter: {other}"
252        )))),
253    }
254}
255
256/// Result of running expression-based not_null and cast_mismatch checks.
257pub struct ExprCheckResult {
258    /// True for each row that passed all expression checks.
259    pub accept_mask: BooleanChunked,
260    /// Per error column: (column_name, is_null mask).
261    /// `is_null[i] == Some(false)` means row `i` has an error for that check.
262    pub col_masks: Vec<(String, BooleanChunked)>,
263    /// Per error column: (column_name, violation_count). Zero-count entries are omitted.
264    pub col_violation_counts: Vec<(String, u64)>,
265}
266
267impl ExprCheckResult {
268    pub fn all_accepted(height: usize) -> Self {
269        Self {
270            accept_mask: BooleanChunked::full("floe_accept".into(), true, height),
271            col_masks: Vec::new(),
272            col_violation_counts: Vec::new(),
273        }
274    }
275
276    pub fn total_violations(&self) -> u64 {
277        self.col_violation_counts.iter().map(|(_, c)| c).sum()
278    }
279}
280
281/// Runs not_null and cast_mismatch checks using Polars columnar operations.
282///
283/// not_null checks are evaluated via a single lazy expression pass on `df`.
284/// cast_mismatch checks are computed directly from `BooleanChunked` masks derived from
285/// the raw and typed DataFrames, avoiding the need to combine them into one DataFrame.
286///
287/// `track_cast` should only be true when cast errors are known to exist (from a prior
288/// columnar count pass), so the cast path is skipped entirely on the happy path.
289pub fn run_expr_checks(
290    df: &DataFrame,
291    raw_df: &DataFrame,
292    required_cols: &[String],
293    columns: &[crate::config::ColumnConfig],
294    track_cast: bool,
295) -> FloeResult<ExprCheckResult> {
296    let mut err_col_names: Vec<String> = Vec::new();
297
298    // Apply not_null expressions in a single lazy pass.
299    let not_null_exprs: Vec<Expr> = required_cols
300        .iter()
301        .map(|col_name| {
302            let (err_col, expr) = not_null::not_null_expr(col_name);
303            err_col_names.push(err_col);
304            expr
305        })
306        .collect();
307
308    let mut checked = if not_null_exprs.is_empty() {
309        df.clone()
310    } else {
311        df.clone()
312            .lazy()
313            .with_columns(not_null_exprs)
314            .collect()
315            .map_err(|e| {
316                Box::new(crate::errors::RunError(format!(
317                    "run_expr_checks: not_null evaluation failed: {e}"
318                ))) as Box<dyn std::error::Error + Send + Sync>
319            })?
320    };
321
322    // Compute cast error columns directly from BooleanChunked masks, avoiding any
323    // need to join or hstack raw and typed DataFrames.
324    if track_cast {
325        for c in columns {
326            if cast::is_string_type(&c.column_type) {
327                continue;
328            }
329            let raw_not_null = raw_df
330                .column(&c.name)
331                .map_err(|e| {
332                    Box::new(crate::errors::RunError(format!(
333                        "run_expr_checks: raw column '{}' not found: {e}",
334                        c.name
335                    ))) as Box<dyn std::error::Error + Send + Sync>
336                })?
337                .is_not_null();
338
339            let typed_null = df
340                .column(&c.name)
341                .map_err(|e| {
342                    Box::new(crate::errors::RunError(format!(
343                        "run_expr_checks: typed column '{}' not found: {e}",
344                        c.name
345                    ))) as Box<dyn std::error::Error + Send + Sync>
346                })?
347                .is_null();
348
349            let error_mask = &typed_null & &raw_not_null;
350            let err_col_name = format!("_e_cast_{}", c.name);
351            let error_json =
352                RowError::new("cast_error", &c.name, "invalid value for target type").to_json();
353
354            let cast_err_series = bool_mask_to_error_series(&err_col_name, error_mask, &error_json);
355            checked.with_column(cast_err_series).map_err(|e| {
356                Box::new(crate::errors::RunError(format!(
357                    "run_expr_checks: could not attach cast error column '{}': {e}",
358                    err_col_name
359                ))) as Box<dyn std::error::Error + Send + Sync>
360            })?;
361            err_col_names.push(err_col_name);
362        }
363    }
364
365    if err_col_names.is_empty() {
366        return Ok(ExprCheckResult::all_accepted(df.height()));
367    }
368
369    let mut accept_mask = BooleanChunked::full("floe_accept".into(), true, checked.height());
370    let mut col_masks: Vec<(String, BooleanChunked)> = Vec::with_capacity(err_col_names.len());
371    let mut col_violation_counts: Vec<(String, u64)> = Vec::with_capacity(err_col_names.len());
372
373    for err_col in &err_col_names {
374        let col = checked.column(err_col).map_err(|e| {
375            Box::new(crate::errors::RunError(format!(
376                "run_expr_checks: error column '{err_col}' missing after eval: {e}"
377            ))) as Box<dyn std::error::Error + Send + Sync>
378        })?;
379        let null_mask = col.is_null();
380        accept_mask = &accept_mask & &null_mask;
381        let violations = (col.len() - col.null_count()) as u64;
382        col_violation_counts.push((err_col.clone(), violations));
383        col_masks.push((err_col.clone(), null_mask));
384    }
385
386    Ok(ExprCheckResult {
387        accept_mask,
388        col_masks,
389        col_violation_counts,
390    })
391}
392
393/// Converts a boolean error mask into a nullable string Series.
394/// Rows where `error_mask` is true get `error_json`; others are null.
395fn bool_mask_to_error_series(
396    col_name: &str,
397    error_mask: BooleanChunked,
398    error_json: &str,
399) -> Series {
400    use polars::prelude::StringChunked;
401    let ca: StringChunked = error_mask
402        .into_iter()
403        .map(|opt_b| {
404            if opt_b == Some(true) {
405                Some(error_json)
406            } else {
407                None
408            }
409        })
410        .collect();
411    ca.with_name(col_name.into()).into_series()
412}
413
414/// Builds the per-row formatted error string for all rejected rows, combining
415/// expression-based check errors with unique-check errors.
416/// Iterates only rejected rows (the minority), so the happy path is O(1).
417pub fn build_errors_formatted_expr(
418    height: usize,
419    accept_mask: &BooleanChunked,
420    col_masks: &[(String, BooleanChunked)],
421    unique_errors: &SparseRowErrors,
422    formatter: &dyn RowErrorFormatter,
423) -> Vec<Option<String>> {
424    let mut out = vec![None; height];
425    for (row_idx, slot) in out.iter_mut().enumerate() {
426        if accept_mask.get(row_idx) == Some(true) {
427            continue;
428        }
429        let mut row_errors: Vec<RowError> = Vec::new();
430        for (err_col_name, null_mask) in col_masks {
431            if null_mask.get(row_idx) == Some(false) {
432                if let Some(col) = err_col_name.strip_prefix("_e_nn_") {
433                    row_errors.push(RowError::new("not_null", col, "required value missing"));
434                } else if let Some(col) = err_col_name.strip_prefix("_e_cast_") {
435                    row_errors.push(RowError::new(
436                        "cast_error",
437                        col,
438                        "invalid value for target type",
439                    ));
440                }
441            }
442        }
443        if let Some(unique_row_errors) = unique_errors.get(row_idx) {
444            row_errors.extend(unique_row_errors.iter().cloned());
445        }
446        if !row_errors.is_empty() {
447            *slot = Some(formatter.format(&row_errors));
448        }
449    }
450    out
451}
452
453pub fn accept_mask_from_error_cols(
454    df: &DataFrame,
455    err_cols: &[&str],
456) -> FloeResult<BooleanChunked> {
457    let mut accept_mask = BooleanChunked::full("floe_accept".into(), true, df.height());
458    for err_col in err_cols {
459        let errors = df.column(err_col).map_err(|err| {
460            Box::new(ConfigError(format!(
461                "error column {err_col} not found: {err}"
462            )))
463        })?;
464        let no_error = errors.is_null();
465        accept_mask = &accept_mask & &no_error;
466    }
467    Ok(accept_mask)
468}
469
470pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
471    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
472    for errors in errors_per_row {
473        accept_rows.push(errors.is_empty());
474    }
475    accept_rows
476}
477
478pub fn build_errors_json(
479    errors_per_row: &[Vec<RowError>],
480    accept_rows: &[bool],
481) -> Vec<Option<String>> {
482    let formatter = JsonRowErrorFormatter { source_map: None };
483    build_errors_formatted(errors_per_row, accept_rows, &formatter)
484}
485
486pub fn build_errors_formatted(
487    errors_per_row: &[Vec<RowError>],
488    accept_rows: &[bool],
489    formatter: &dyn RowErrorFormatter,
490) -> Vec<Option<String>> {
491    let mut errors_out = Vec::with_capacity(errors_per_row.len());
492    for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
493        if *accepted {
494            errors_out.push(None);
495            continue;
496        }
497        errors_out.push(Some(formatter.format(errors)));
498    }
499    errors_out
500}
501
502pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
503    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
504    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
505    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
506    (accept_mask, reject_mask)
507}
508
509pub fn rejected_error_columns(
510    errors_per_row: &[Option<String>],
511    include_all_rows: bool,
512) -> (Series, Series) {
513    if include_all_rows {
514        let mut row_index = Vec::with_capacity(errors_per_row.len());
515        let mut errors = Vec::with_capacity(errors_per_row.len());
516        for (idx, err) in errors_per_row.iter().enumerate() {
517            row_index.push(idx as u64);
518            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
519        }
520        (
521            Series::new("__floe_row_index".into(), row_index),
522            Series::new("__floe_errors".into(), errors),
523        )
524    } else {
525        let mut row_index = Vec::new();
526        let mut errors = Vec::new();
527        for (idx, err) in errors_per_row.iter().enumerate() {
528            if let Some(err) = err {
529                row_index.push(idx as u64);
530                errors.push(err.clone());
531            }
532        }
533        (
534            Series::new("__floe_row_index".into(), row_index),
535            Series::new("__floe_errors".into(), errors),
536        )
537    }
538}
539
540fn json_escape(value: &str) -> String {
541    value
542        .replace('\\', "\\\\")
543        .replace('\"', "\\\"")
544        .replace('\n', "\\n")
545        .replace('\r', "\\r")
546        .replace('\t', "\\t")
547}
548
549fn json_string(value: &str) -> String {
550    format!("\"{}\"", json_escape(value))
551}
552
553fn csv_escape(value: &str) -> String {
554    let escaped = value.replace('"', "\"\"");
555    if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
556        format!("\"{}\"", escaped)
557    } else {
558        escaped
559    }
560}