Skip to main content

floe_core/checks/
mod.rs

1mod cast;
2mod mismatch;
3mod not_null;
4mod unique;
5
6use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
7use std::collections::HashMap;
8
9use crate::{ConfigError, FloeResult};
10
11pub use cast::{cast_mismatch_counts, cast_mismatch_errors};
12pub use mismatch::{
13    apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, MismatchOutcome,
14};
15pub use not_null::{not_null_counts, not_null_errors};
16pub use unique::{unique_counts, unique_errors, UniqueTracker};
17
18pub type ColumnIndex = HashMap<String, usize>;
19
20pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
21    df.get_column_names()
22        .iter()
23        .enumerate()
24        .map(|(idx, name)| (name.to_string(), idx))
25        .collect()
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct RowError {
30    pub rule: String,
31    pub column: String,
32    pub message: String,
33}
34
35impl RowError {
36    pub fn new(rule: &str, column: &str, message: &str) -> Self {
37        Self {
38            rule: rule.to_string(),
39            column: column.to_string(),
40            message: message.to_string(),
41        }
42    }
43
44    pub fn to_json(&self) -> String {
45        format!(
46            "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
47            json_escape(&self.rule),
48            json_escape(&self.column),
49            json_escape(&self.message)
50        )
51    }
52}
53
54pub trait RowErrorFormatter {
55    fn format(&self, errors: &[RowError]) -> String;
56}
57
58pub struct JsonRowErrorFormatter;
59pub struct CsvRowErrorFormatter;
60pub struct TextRowErrorFormatter;
61
62impl RowErrorFormatter for JsonRowErrorFormatter {
63    fn format(&self, errors: &[RowError]) -> String {
64        let json_items = errors
65            .iter()
66            .map(RowError::to_json)
67            .collect::<Vec<_>>()
68            .join(",");
69        format!("[{}]", json_items)
70    }
71}
72
73impl RowErrorFormatter for CsvRowErrorFormatter {
74    fn format(&self, errors: &[RowError]) -> String {
75        let lines = errors
76            .iter()
77            .map(|error| {
78                format!(
79                    "{},{},{}",
80                    csv_escape(&error.rule),
81                    csv_escape(&error.column),
82                    csv_escape(&error.message)
83                )
84            })
85            .collect::<Vec<_>>()
86            .join("\n");
87        json_string(&lines)
88    }
89}
90
91impl RowErrorFormatter for TextRowErrorFormatter {
92    fn format(&self, errors: &[RowError]) -> String {
93        let text = errors
94            .iter()
95            .map(|error| format!("{}:{} {}", error.rule, error.column, error.message))
96            .collect::<Vec<_>>()
97            .join("; ");
98        json_string(&text)
99    }
100}
101
102pub fn row_error_formatter(name: &str) -> FloeResult<Box<dyn RowErrorFormatter>> {
103    match name {
104        "json" => Ok(Box::new(JsonRowErrorFormatter)),
105        "csv" => Ok(Box::new(CsvRowErrorFormatter)),
106        "text" => Ok(Box::new(TextRowErrorFormatter)),
107        other => Err(Box::new(ConfigError(format!(
108            "unsupported report.formatter: {other}"
109        )))),
110    }
111}
112
113pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
114    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
115    for errors in errors_per_row {
116        accept_rows.push(errors.is_empty());
117    }
118    accept_rows
119}
120
121pub fn build_errors_json(
122    errors_per_row: &[Vec<RowError>],
123    accept_rows: &[bool],
124) -> Vec<Option<String>> {
125    build_errors_formatted(errors_per_row, accept_rows, &JsonRowErrorFormatter)
126}
127
128pub fn build_errors_formatted(
129    errors_per_row: &[Vec<RowError>],
130    accept_rows: &[bool],
131    formatter: &dyn RowErrorFormatter,
132) -> Vec<Option<String>> {
133    let mut errors_out = Vec::with_capacity(errors_per_row.len());
134    for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
135        if *accepted {
136            errors_out.push(None);
137            continue;
138        }
139        errors_out.push(Some(formatter.format(errors)));
140    }
141    errors_out
142}
143
144pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
145    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
146    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
147    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
148    (accept_mask, reject_mask)
149}
150
151pub fn rejected_error_columns(
152    errors_per_row: &[Option<String>],
153    include_all_rows: bool,
154) -> (Series, Series) {
155    if include_all_rows {
156        let mut row_index = Vec::with_capacity(errors_per_row.len());
157        let mut errors = Vec::with_capacity(errors_per_row.len());
158        for (idx, err) in errors_per_row.iter().enumerate() {
159            row_index.push(idx as u64);
160            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
161        }
162        (
163            Series::new("__floe_row_index".into(), row_index),
164            Series::new("__floe_errors".into(), errors),
165        )
166    } else {
167        let mut row_index = Vec::new();
168        let mut errors = Vec::new();
169        for (idx, err) in errors_per_row.iter().enumerate() {
170            if let Some(err) = err {
171                row_index.push(idx as u64);
172                errors.push(err.clone());
173            }
174        }
175        (
176            Series::new("__floe_row_index".into(), row_index),
177            Series::new("__floe_errors".into(), errors),
178        )
179    }
180}
181
182fn json_escape(value: &str) -> String {
183    value
184        .replace('\\', "\\\\")
185        .replace('\"', "\\\"")
186        .replace('\n', "\\n")
187        .replace('\r', "\\r")
188        .replace('\t', "\\t")
189}
190
191fn json_string(value: &str) -> String {
192    format!("\"{}\"", json_escape(value))
193}
194
195fn csv_escape(value: &str) -> String {
196    let escaped = value.replace('"', "\"\"");
197    if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
198        format!("\"{}\"", escaped)
199    } else {
200        escaped
201    }
202}