Skip to main content

floe_core/checks/
mod.rs

1mod cast;
2mod mismatch;
3pub mod normalize;
4mod not_null;
5mod unique;
6
7use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
8use std::collections::{BTreeMap, HashMap};
9
10use crate::{ConfigError, FloeResult};
11
12pub use cast::{cast_mismatch_counts, cast_mismatch_errors, cast_mismatch_errors_sparse};
13pub use mismatch::{
14    apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, MismatchOutcome,
15};
16pub use not_null::{not_null_counts, not_null_errors, not_null_errors_sparse};
17pub use unique::{unique_counts, unique_errors, unique_errors_sparse, UniqueTracker};
18
19pub type ColumnIndex = HashMap<String, usize>;
20
21pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
22    df.get_column_names()
23        .iter()
24        .enumerate()
25        .map(|(idx, name)| (name.to_string(), idx))
26        .collect()
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct RowError {
31    pub rule: String,
32    pub column: String,
33    pub message: String,
34}
35
36#[derive(Debug, Clone, Default)]
37pub struct SparseRowErrors {
38    row_count: usize,
39    rows: BTreeMap<usize, Vec<RowError>>,
40}
41
42impl SparseRowErrors {
43    pub fn new(row_count: usize) -> Self {
44        Self {
45            row_count,
46            rows: BTreeMap::new(),
47        }
48    }
49
50    pub fn is_empty(&self) -> bool {
51        self.rows.is_empty()
52    }
53
54    pub fn add_error(&mut self, row_idx: usize, error: RowError) {
55        self.rows.entry(row_idx).or_default().push(error);
56    }
57
58    pub fn add_errors(&mut self, row_idx: usize, errors: Vec<RowError>) {
59        if errors.is_empty() {
60            return;
61        }
62        self.rows.entry(row_idx).or_default().extend(errors);
63    }
64
65    pub fn merge(&mut self, other: SparseRowErrors) {
66        for (row_idx, errors) in other.rows {
67            self.add_errors(row_idx, errors);
68        }
69    }
70
71    pub fn accept_rows(&self) -> Vec<bool> {
72        let mut accept_rows = vec![true; self.row_count];
73        for row_idx in self.rows.keys() {
74            if let Some(slot) = accept_rows.get_mut(*row_idx) {
75                *slot = false;
76            }
77        }
78        accept_rows
79    }
80
81    pub fn build_errors_formatted(&self, formatter: &dyn RowErrorFormatter) -> Vec<Option<String>> {
82        let mut errors_out = vec![None; self.row_count];
83        for (row_idx, errors) in &self.rows {
84            if let Some(slot) = errors_out.get_mut(*row_idx) {
85                *slot = Some(formatter.format(errors));
86            }
87        }
88        errors_out
89    }
90
91    pub fn iter(&self) -> impl Iterator<Item = (&usize, &Vec<RowError>)> {
92        self.rows.iter()
93    }
94
95    pub fn error_row_count(&self) -> u64 {
96        self.rows.len() as u64
97    }
98
99    pub fn violation_count(&self) -> u64 {
100        self.rows.values().map(|errors| errors.len() as u64).sum()
101    }
102}
103
104impl RowError {
105    pub fn new(rule: &str, column: &str, message: &str) -> Self {
106        Self {
107            rule: rule.to_string(),
108            column: column.to_string(),
109            message: message.to_string(),
110        }
111    }
112
113    pub fn to_json(&self) -> String {
114        format!(
115            "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
116            json_escape(&self.rule),
117            json_escape(&self.column),
118            json_escape(&self.message)
119        )
120    }
121}
122
123pub trait RowErrorFormatter {
124    fn format(&self, errors: &[RowError]) -> String;
125}
126
127pub struct JsonRowErrorFormatter;
128pub struct CsvRowErrorFormatter;
129pub struct TextRowErrorFormatter;
130
131impl RowErrorFormatter for JsonRowErrorFormatter {
132    fn format(&self, errors: &[RowError]) -> String {
133        let json_items = errors
134            .iter()
135            .map(RowError::to_json)
136            .collect::<Vec<_>>()
137            .join(",");
138        format!("[{}]", json_items)
139    }
140}
141
142impl RowErrorFormatter for CsvRowErrorFormatter {
143    fn format(&self, errors: &[RowError]) -> String {
144        let lines = errors
145            .iter()
146            .map(|error| {
147                format!(
148                    "{},{},{}",
149                    csv_escape(&error.rule),
150                    csv_escape(&error.column),
151                    csv_escape(&error.message)
152                )
153            })
154            .collect::<Vec<_>>()
155            .join("\n");
156        json_string(&lines)
157    }
158}
159
160impl RowErrorFormatter for TextRowErrorFormatter {
161    fn format(&self, errors: &[RowError]) -> String {
162        let text = errors
163            .iter()
164            .map(|error| format!("{}:{} {}", error.rule, error.column, error.message))
165            .collect::<Vec<_>>()
166            .join("; ");
167        json_string(&text)
168    }
169}
170
171pub fn row_error_formatter(name: &str) -> FloeResult<Box<dyn RowErrorFormatter>> {
172    match name {
173        "json" => Ok(Box::new(JsonRowErrorFormatter)),
174        "csv" => Ok(Box::new(CsvRowErrorFormatter)),
175        "text" => Ok(Box::new(TextRowErrorFormatter)),
176        other => Err(Box::new(ConfigError(format!(
177            "unsupported report.formatter: {other}"
178        )))),
179    }
180}
181
182pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
183    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
184    for errors in errors_per_row {
185        accept_rows.push(errors.is_empty());
186    }
187    accept_rows
188}
189
190pub fn build_errors_json(
191    errors_per_row: &[Vec<RowError>],
192    accept_rows: &[bool],
193) -> Vec<Option<String>> {
194    build_errors_formatted(errors_per_row, accept_rows, &JsonRowErrorFormatter)
195}
196
197pub fn build_errors_formatted(
198    errors_per_row: &[Vec<RowError>],
199    accept_rows: &[bool],
200    formatter: &dyn RowErrorFormatter,
201) -> Vec<Option<String>> {
202    let mut errors_out = Vec::with_capacity(errors_per_row.len());
203    for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
204        if *accepted {
205            errors_out.push(None);
206            continue;
207        }
208        errors_out.push(Some(formatter.format(errors)));
209    }
210    errors_out
211}
212
213pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
214    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
215    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
216    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
217    (accept_mask, reject_mask)
218}
219
220pub fn rejected_error_columns(
221    errors_per_row: &[Option<String>],
222    include_all_rows: bool,
223) -> (Series, Series) {
224    if include_all_rows {
225        let mut row_index = Vec::with_capacity(errors_per_row.len());
226        let mut errors = Vec::with_capacity(errors_per_row.len());
227        for (idx, err) in errors_per_row.iter().enumerate() {
228            row_index.push(idx as u64);
229            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
230        }
231        (
232            Series::new("__floe_row_index".into(), row_index),
233            Series::new("__floe_errors".into(), errors),
234        )
235    } else {
236        let mut row_index = Vec::new();
237        let mut errors = Vec::new();
238        for (idx, err) in errors_per_row.iter().enumerate() {
239            if let Some(err) = err {
240                row_index.push(idx as u64);
241                errors.push(err.clone());
242            }
243        }
244        (
245            Series::new("__floe_row_index".into(), row_index),
246            Series::new("__floe_errors".into(), errors),
247        )
248    }
249}
250
251fn json_escape(value: &str) -> String {
252    value
253        .replace('\\', "\\\\")
254        .replace('\"', "\\\"")
255        .replace('\n', "\\n")
256        .replace('\r', "\\r")
257        .replace('\t', "\\t")
258}
259
260fn json_string(value: &str) -> String {
261    format!("\"{}\"", json_escape(value))
262}
263
264fn csv_escape(value: &str) -> String {
265    let escaped = value.replace('"', "\"\"");
266    if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
267        format!("\"{}\"", escaped)
268    } else {
269        escaped
270    }
271}