Skip to main content

floe_core/checks/
mod.rs

1mod cast;
2mod mismatch;
3pub mod normalize;
4mod not_null;
5mod unique;
6
7use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
8use std::collections::{BTreeMap, HashMap};
9
10use crate::{ConfigError, FloeResult};
11
12pub use cast::{cast_mismatch_counts, cast_mismatch_errors, cast_mismatch_errors_sparse};
13pub use mismatch::{
14    apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, resolve_mismatch_columns,
15    top_level_declared_columns, MismatchOutcome,
16};
17pub use not_null::{not_null_counts, not_null_errors, not_null_errors_sparse};
18pub use unique::{unique_counts, unique_errors, unique_errors_sparse, UniqueTracker};
19
20pub type ColumnIndex = HashMap<String, usize>;
21
22pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
23    df.get_column_names()
24        .iter()
25        .enumerate()
26        .map(|(idx, name)| (name.to_string(), idx))
27        .collect()
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct RowError {
32    pub rule: String,
33    pub column: String,
34    pub message: String,
35}
36
37#[derive(Debug, Clone, Default)]
38pub struct SparseRowErrors {
39    row_count: usize,
40    rows: BTreeMap<usize, Vec<RowError>>,
41}
42
43impl SparseRowErrors {
44    pub fn new(row_count: usize) -> Self {
45        Self {
46            row_count,
47            rows: BTreeMap::new(),
48        }
49    }
50
51    pub fn is_empty(&self) -> bool {
52        self.rows.is_empty()
53    }
54
55    pub fn add_error(&mut self, row_idx: usize, error: RowError) {
56        self.rows.entry(row_idx).or_default().push(error);
57    }
58
59    pub fn add_errors(&mut self, row_idx: usize, errors: Vec<RowError>) {
60        if errors.is_empty() {
61            return;
62        }
63        self.rows.entry(row_idx).or_default().extend(errors);
64    }
65
66    pub fn merge(&mut self, other: SparseRowErrors) {
67        for (row_idx, errors) in other.rows {
68            self.add_errors(row_idx, errors);
69        }
70    }
71
72    pub fn accept_rows(&self) -> Vec<bool> {
73        let mut accept_rows = vec![true; self.row_count];
74        for row_idx in self.rows.keys() {
75            if let Some(slot) = accept_rows.get_mut(*row_idx) {
76                *slot = false;
77            }
78        }
79        accept_rows
80    }
81
82    pub fn build_errors_formatted(&self, formatter: &dyn RowErrorFormatter) -> Vec<Option<String>> {
83        let mut errors_out = vec![None; self.row_count];
84        for (row_idx, errors) in &self.rows {
85            if let Some(slot) = errors_out.get_mut(*row_idx) {
86                *slot = Some(formatter.format(errors));
87            }
88        }
89        errors_out
90    }
91
92    pub fn iter(&self) -> impl Iterator<Item = (&usize, &Vec<RowError>)> {
93        self.rows.iter()
94    }
95
96    pub fn error_row_count(&self) -> u64 {
97        self.rows.len() as u64
98    }
99
100    pub fn violation_count(&self) -> u64 {
101        self.rows.values().map(|errors| errors.len() as u64).sum()
102    }
103}
104
105impl RowError {
106    pub fn new(rule: &str, column: &str, message: &str) -> Self {
107        Self {
108            rule: rule.to_string(),
109            column: column.to_string(),
110            message: message.to_string(),
111        }
112    }
113
114    pub fn to_json(&self) -> String {
115        self.to_json_with_source(None)
116    }
117
118    pub fn to_json_with_source(&self, source: Option<&str>) -> String {
119        match source {
120            Some(source) => format!(
121                "{{\"rule\":\"{}\",\"column\":\"{}\",\"source\":\"{}\",\"message\":\"{}\"}}",
122                json_escape(&self.rule),
123                json_escape(&self.column),
124                json_escape(source),
125                json_escape(&self.message)
126            ),
127            None => format!(
128                "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
129                json_escape(&self.rule),
130                json_escape(&self.column),
131                json_escape(&self.message)
132            ),
133        }
134    }
135}
136
137pub trait RowErrorFormatter {
138    fn format(&self, errors: &[RowError]) -> String;
139}
140
141#[derive(Default)]
142pub struct JsonRowErrorFormatter {
143    source_map: Option<HashMap<String, String>>,
144}
145#[derive(Default)]
146pub struct CsvRowErrorFormatter {
147    source_map: Option<HashMap<String, String>>,
148}
149#[derive(Default)]
150pub struct TextRowErrorFormatter {
151    source_map: Option<HashMap<String, String>>,
152}
153
154impl RowErrorFormatter for JsonRowErrorFormatter {
155    fn format(&self, errors: &[RowError]) -> String {
156        let json_items = errors
157            .iter()
158            .map(|error| {
159                let source = self
160                    .source_map
161                    .as_ref()
162                    .and_then(|map| map.get(&error.column).map(|value| value.as_str()));
163                error.to_json_with_source(source)
164            })
165            .collect::<Vec<_>>()
166            .join(",");
167        format!("[{}]", json_items)
168    }
169}
170
171impl RowErrorFormatter for CsvRowErrorFormatter {
172    fn format(&self, errors: &[RowError]) -> String {
173        let lines = errors
174            .iter()
175            .map(|error| {
176                if let Some(source_map) = self.source_map.as_ref() {
177                    let source = source_map
178                        .get(&error.column)
179                        .map(|value| value.as_str())
180                        .unwrap_or("");
181                    format!(
182                        "{},{},{},{}",
183                        csv_escape(&error.rule),
184                        csv_escape(&error.column),
185                        csv_escape(source),
186                        csv_escape(&error.message)
187                    )
188                } else {
189                    format!(
190                        "{},{},{}",
191                        csv_escape(&error.rule),
192                        csv_escape(&error.column),
193                        csv_escape(&error.message)
194                    )
195                }
196            })
197            .collect::<Vec<_>>()
198            .join("\n");
199        json_string(&lines)
200    }
201}
202
203impl RowErrorFormatter for TextRowErrorFormatter {
204    fn format(&self, errors: &[RowError]) -> String {
205        let text = errors
206            .iter()
207            .map(|error| {
208                if let Some(source_map) = self.source_map.as_ref() {
209                    if let Some(source) = source_map.get(&error.column) {
210                        return format!(
211                            "{}:{} source={} {}",
212                            error.rule, error.column, source, error.message
213                        );
214                    }
215                }
216                format!("{}:{} {}", error.rule, error.column, error.message)
217            })
218            .collect::<Vec<_>>()
219            .join("; ");
220        json_string(&text)
221    }
222}
223
224pub fn row_error_formatter(
225    name: &str,
226    source_map: Option<&HashMap<String, String>>,
227) -> FloeResult<Box<dyn RowErrorFormatter>> {
228    match name {
229        "json" => Ok(Box::new(JsonRowErrorFormatter {
230            source_map: source_map.cloned(),
231        })),
232        "csv" => Ok(Box::new(CsvRowErrorFormatter {
233            source_map: source_map.cloned(),
234        })),
235        "text" => Ok(Box::new(TextRowErrorFormatter {
236            source_map: source_map.cloned(),
237        })),
238        other => Err(Box::new(ConfigError(format!(
239            "unsupported report.formatter: {other}"
240        )))),
241    }
242}
243
244pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
245    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
246    for errors in errors_per_row {
247        accept_rows.push(errors.is_empty());
248    }
249    accept_rows
250}
251
252pub fn build_errors_json(
253    errors_per_row: &[Vec<RowError>],
254    accept_rows: &[bool],
255) -> Vec<Option<String>> {
256    let formatter = JsonRowErrorFormatter { source_map: None };
257    build_errors_formatted(errors_per_row, accept_rows, &formatter)
258}
259
260pub fn build_errors_formatted(
261    errors_per_row: &[Vec<RowError>],
262    accept_rows: &[bool],
263    formatter: &dyn RowErrorFormatter,
264) -> Vec<Option<String>> {
265    let mut errors_out = Vec::with_capacity(errors_per_row.len());
266    for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
267        if *accepted {
268            errors_out.push(None);
269            continue;
270        }
271        errors_out.push(Some(formatter.format(errors)));
272    }
273    errors_out
274}
275
276pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
277    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
278    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
279    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
280    (accept_mask, reject_mask)
281}
282
283pub fn rejected_error_columns(
284    errors_per_row: &[Option<String>],
285    include_all_rows: bool,
286) -> (Series, Series) {
287    if include_all_rows {
288        let mut row_index = Vec::with_capacity(errors_per_row.len());
289        let mut errors = Vec::with_capacity(errors_per_row.len());
290        for (idx, err) in errors_per_row.iter().enumerate() {
291            row_index.push(idx as u64);
292            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
293        }
294        (
295            Series::new("__floe_row_index".into(), row_index),
296            Series::new("__floe_errors".into(), errors),
297        )
298    } else {
299        let mut row_index = Vec::new();
300        let mut errors = Vec::new();
301        for (idx, err) in errors_per_row.iter().enumerate() {
302            if let Some(err) = err {
303                row_index.push(idx as u64);
304                errors.push(err.clone());
305            }
306        }
307        (
308            Series::new("__floe_row_index".into(), row_index),
309            Series::new("__floe_errors".into(), errors),
310        )
311    }
312}
313
314fn json_escape(value: &str) -> String {
315    value
316        .replace('\\', "\\\\")
317        .replace('\"', "\\\"")
318        .replace('\n', "\\n")
319        .replace('\r', "\\r")
320        .replace('\t', "\\t")
321}
322
323fn json_string(value: &str) -> String {
324    format!("\"{}\"", json_escape(value))
325}
326
327fn csv_escape(value: &str) -> String {
328    let escaped = value.replace('"', "\"\"");
329    if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
330        format!("\"{}\"", escaped)
331    } else {
332        escaped
333    }
334}