Skip to main content

floe_core/checks/
mod.rs

1mod cast;
2mod mismatch;
3pub mod normalize;
4mod not_null;
5mod unique;
6
7use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
8use std::collections::{BTreeMap, HashMap};
9
10use crate::{ConfigError, FloeResult};
11
12pub use cast::{cast_mismatch_counts, cast_mismatch_errors, cast_mismatch_errors_sparse};
13pub use mismatch::{
14    apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, resolve_mismatch_columns,
15    top_level_declared_columns, MismatchOutcome,
16};
17pub use not_null::{not_null_counts, not_null_errors, not_null_errors_sparse};
18pub use unique::{
19    resolve_schema_unique_keys, unique_counts, unique_errors, unique_errors_sparse,
20    UniqueConstraint, UniqueConstraintResult, UniqueTracker,
21};
22
23pub type ColumnIndex = HashMap<String, usize>;
24
25pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
26    df.get_column_names()
27        .iter()
28        .enumerate()
29        .map(|(idx, name)| (name.to_string(), idx))
30        .collect()
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct RowError {
35    pub rule: String,
36    pub column: String,
37    pub message: String,
38}
39
40#[derive(Debug, Clone, Default)]
41pub struct SparseRowErrors {
42    row_count: usize,
43    rows: BTreeMap<usize, Vec<RowError>>,
44}
45
46impl SparseRowErrors {
47    pub fn new(row_count: usize) -> Self {
48        Self {
49            row_count,
50            rows: BTreeMap::new(),
51        }
52    }
53
54    pub fn is_empty(&self) -> bool {
55        self.rows.is_empty()
56    }
57
58    pub fn add_error(&mut self, row_idx: usize, error: RowError) {
59        self.rows.entry(row_idx).or_default().push(error);
60    }
61
62    pub fn add_errors(&mut self, row_idx: usize, errors: Vec<RowError>) {
63        if errors.is_empty() {
64            return;
65        }
66        self.rows.entry(row_idx).or_default().extend(errors);
67    }
68
69    pub fn merge(&mut self, other: SparseRowErrors) {
70        for (row_idx, errors) in other.rows {
71            self.add_errors(row_idx, errors);
72        }
73    }
74
75    pub fn accept_rows(&self) -> Vec<bool> {
76        let mut accept_rows = vec![true; self.row_count];
77        for row_idx in self.rows.keys() {
78            if let Some(slot) = accept_rows.get_mut(*row_idx) {
79                *slot = false;
80            }
81        }
82        accept_rows
83    }
84
85    pub fn build_errors_formatted(&self, formatter: &dyn RowErrorFormatter) -> Vec<Option<String>> {
86        let mut errors_out = vec![None; self.row_count];
87        for (row_idx, errors) in &self.rows {
88            if let Some(slot) = errors_out.get_mut(*row_idx) {
89                *slot = Some(formatter.format(errors));
90            }
91        }
92        errors_out
93    }
94
95    pub fn iter(&self) -> impl Iterator<Item = (&usize, &Vec<RowError>)> {
96        self.rows.iter()
97    }
98
99    pub fn error_row_count(&self) -> u64 {
100        self.rows.len() as u64
101    }
102
103    pub fn violation_count(&self) -> u64 {
104        self.rows.values().map(|errors| errors.len() as u64).sum()
105    }
106}
107
108impl RowError {
109    pub fn new(rule: &str, column: &str, message: &str) -> Self {
110        Self {
111            rule: rule.to_string(),
112            column: column.to_string(),
113            message: message.to_string(),
114        }
115    }
116
117    pub fn to_json(&self) -> String {
118        self.to_json_with_source(None)
119    }
120
121    pub fn to_json_with_source(&self, source: Option<&str>) -> String {
122        match source {
123            Some(source) => format!(
124                "{{\"rule\":\"{}\",\"column\":\"{}\",\"source\":\"{}\",\"message\":\"{}\"}}",
125                json_escape(&self.rule),
126                json_escape(&self.column),
127                json_escape(source),
128                json_escape(&self.message)
129            ),
130            None => format!(
131                "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
132                json_escape(&self.rule),
133                json_escape(&self.column),
134                json_escape(&self.message)
135            ),
136        }
137    }
138}
139
140pub trait RowErrorFormatter {
141    fn format(&self, errors: &[RowError]) -> String;
142}
143
144#[derive(Default)]
145pub struct JsonRowErrorFormatter {
146    source_map: Option<HashMap<String, String>>,
147}
148#[derive(Default)]
149pub struct CsvRowErrorFormatter {
150    source_map: Option<HashMap<String, String>>,
151}
152#[derive(Default)]
153pub struct TextRowErrorFormatter {
154    source_map: Option<HashMap<String, String>>,
155}
156
157impl RowErrorFormatter for JsonRowErrorFormatter {
158    fn format(&self, errors: &[RowError]) -> String {
159        let json_items = errors
160            .iter()
161            .map(|error| {
162                let source = self
163                    .source_map
164                    .as_ref()
165                    .and_then(|map| map.get(&error.column).map(|value| value.as_str()));
166                error.to_json_with_source(source)
167            })
168            .collect::<Vec<_>>()
169            .join(",");
170        format!("[{}]", json_items)
171    }
172}
173
174impl RowErrorFormatter for CsvRowErrorFormatter {
175    fn format(&self, errors: &[RowError]) -> String {
176        let lines = errors
177            .iter()
178            .map(|error| {
179                if let Some(source_map) = self.source_map.as_ref() {
180                    let source = source_map
181                        .get(&error.column)
182                        .map(|value| value.as_str())
183                        .unwrap_or("");
184                    format!(
185                        "{},{},{},{}",
186                        csv_escape(&error.rule),
187                        csv_escape(&error.column),
188                        csv_escape(source),
189                        csv_escape(&error.message)
190                    )
191                } else {
192                    format!(
193                        "{},{},{}",
194                        csv_escape(&error.rule),
195                        csv_escape(&error.column),
196                        csv_escape(&error.message)
197                    )
198                }
199            })
200            .collect::<Vec<_>>()
201            .join("\n");
202        json_string(&lines)
203    }
204}
205
206impl RowErrorFormatter for TextRowErrorFormatter {
207    fn format(&self, errors: &[RowError]) -> String {
208        let text = errors
209            .iter()
210            .map(|error| {
211                if let Some(source_map) = self.source_map.as_ref() {
212                    if let Some(source) = source_map.get(&error.column) {
213                        return format!(
214                            "{}:{} source={} {}",
215                            error.rule, error.column, source, error.message
216                        );
217                    }
218                }
219                format!("{}:{} {}", error.rule, error.column, error.message)
220            })
221            .collect::<Vec<_>>()
222            .join("; ");
223        json_string(&text)
224    }
225}
226
227pub fn row_error_formatter(
228    name: &str,
229    source_map: Option<&HashMap<String, String>>,
230) -> FloeResult<Box<dyn RowErrorFormatter>> {
231    match name {
232        "json" => Ok(Box::new(JsonRowErrorFormatter {
233            source_map: source_map.cloned(),
234        })),
235        "csv" => Ok(Box::new(CsvRowErrorFormatter {
236            source_map: source_map.cloned(),
237        })),
238        "text" => Ok(Box::new(TextRowErrorFormatter {
239            source_map: source_map.cloned(),
240        })),
241        other => Err(Box::new(ConfigError(format!(
242            "unsupported report.formatter: {other}"
243        )))),
244    }
245}
246
247pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
248    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
249    for errors in errors_per_row {
250        accept_rows.push(errors.is_empty());
251    }
252    accept_rows
253}
254
255pub fn build_errors_json(
256    errors_per_row: &[Vec<RowError>],
257    accept_rows: &[bool],
258) -> Vec<Option<String>> {
259    let formatter = JsonRowErrorFormatter { source_map: None };
260    build_errors_formatted(errors_per_row, accept_rows, &formatter)
261}
262
263pub fn build_errors_formatted(
264    errors_per_row: &[Vec<RowError>],
265    accept_rows: &[bool],
266    formatter: &dyn RowErrorFormatter,
267) -> Vec<Option<String>> {
268    let mut errors_out = Vec::with_capacity(errors_per_row.len());
269    for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
270        if *accepted {
271            errors_out.push(None);
272            continue;
273        }
274        errors_out.push(Some(formatter.format(errors)));
275    }
276    errors_out
277}
278
279pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
280    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
281    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
282    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
283    (accept_mask, reject_mask)
284}
285
286pub fn rejected_error_columns(
287    errors_per_row: &[Option<String>],
288    include_all_rows: bool,
289) -> (Series, Series) {
290    if include_all_rows {
291        let mut row_index = Vec::with_capacity(errors_per_row.len());
292        let mut errors = Vec::with_capacity(errors_per_row.len());
293        for (idx, err) in errors_per_row.iter().enumerate() {
294            row_index.push(idx as u64);
295            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
296        }
297        (
298            Series::new("__floe_row_index".into(), row_index),
299            Series::new("__floe_errors".into(), errors),
300        )
301    } else {
302        let mut row_index = Vec::new();
303        let mut errors = Vec::new();
304        for (idx, err) in errors_per_row.iter().enumerate() {
305            if let Some(err) = err {
306                row_index.push(idx as u64);
307                errors.push(err.clone());
308            }
309        }
310        (
311            Series::new("__floe_row_index".into(), row_index),
312            Series::new("__floe_errors".into(), errors),
313        )
314    }
315}
316
317fn json_escape(value: &str) -> String {
318    value
319        .replace('\\', "\\\\")
320        .replace('\"', "\\\"")
321        .replace('\n', "\\n")
322        .replace('\r', "\\r")
323        .replace('\t', "\\t")
324}
325
326fn json_string(value: &str) -> String {
327    format!("\"{}\"", json_escape(value))
328}
329
330fn csv_escape(value: &str) -> String {
331    let escaped = value.replace('"', "\"\"");
332    if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
333        format!("\"{}\"", escaped)
334    } else {
335        escaped
336    }
337}