Skip to main content

floe_core/checks/
mod.rs

1mod cast;
2mod mismatch;
3mod not_null;
4mod unique;
5
6use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
7use std::collections::{BTreeMap, HashMap};
8
9use crate::{ConfigError, FloeResult};
10
11pub use cast::{cast_mismatch_counts, cast_mismatch_errors, cast_mismatch_errors_sparse};
12pub use mismatch::{
13    apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, MismatchOutcome,
14};
15pub use not_null::{not_null_counts, not_null_errors, not_null_errors_sparse};
16pub use unique::{unique_counts, unique_errors, unique_errors_sparse, UniqueTracker};
17
18pub type ColumnIndex = HashMap<String, usize>;
19
20pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
21    df.get_column_names()
22        .iter()
23        .enumerate()
24        .map(|(idx, name)| (name.to_string(), idx))
25        .collect()
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct RowError {
30    pub rule: String,
31    pub column: String,
32    pub message: String,
33}
34
35#[derive(Debug, Clone, Default)]
36pub struct SparseRowErrors {
37    row_count: usize,
38    rows: BTreeMap<usize, Vec<RowError>>,
39}
40
41impl SparseRowErrors {
42    pub fn new(row_count: usize) -> Self {
43        Self {
44            row_count,
45            rows: BTreeMap::new(),
46        }
47    }
48
49    pub fn is_empty(&self) -> bool {
50        self.rows.is_empty()
51    }
52
53    pub fn add_error(&mut self, row_idx: usize, error: RowError) {
54        self.rows.entry(row_idx).or_default().push(error);
55    }
56
57    pub fn add_errors(&mut self, row_idx: usize, errors: Vec<RowError>) {
58        if errors.is_empty() {
59            return;
60        }
61        self.rows.entry(row_idx).or_default().extend(errors);
62    }
63
64    pub fn merge(&mut self, other: SparseRowErrors) {
65        for (row_idx, errors) in other.rows {
66            self.add_errors(row_idx, errors);
67        }
68    }
69
70    pub fn accept_rows(&self) -> Vec<bool> {
71        let mut accept_rows = vec![true; self.row_count];
72        for row_idx in self.rows.keys() {
73            if let Some(slot) = accept_rows.get_mut(*row_idx) {
74                *slot = false;
75            }
76        }
77        accept_rows
78    }
79
80    pub fn build_errors_formatted(&self, formatter: &dyn RowErrorFormatter) -> Vec<Option<String>> {
81        let mut errors_out = vec![None; self.row_count];
82        for (row_idx, errors) in &self.rows {
83            if let Some(slot) = errors_out.get_mut(*row_idx) {
84                *slot = Some(formatter.format(errors));
85            }
86        }
87        errors_out
88    }
89
90    pub fn iter(&self) -> impl Iterator<Item = (&usize, &Vec<RowError>)> {
91        self.rows.iter()
92    }
93
94    pub fn error_row_count(&self) -> u64 {
95        self.rows.len() as u64
96    }
97
98    pub fn violation_count(&self) -> u64 {
99        self.rows.values().map(|errors| errors.len() as u64).sum()
100    }
101}
102
103impl RowError {
104    pub fn new(rule: &str, column: &str, message: &str) -> Self {
105        Self {
106            rule: rule.to_string(),
107            column: column.to_string(),
108            message: message.to_string(),
109        }
110    }
111
112    pub fn to_json(&self) -> String {
113        format!(
114            "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
115            json_escape(&self.rule),
116            json_escape(&self.column),
117            json_escape(&self.message)
118        )
119    }
120}
121
122pub trait RowErrorFormatter {
123    fn format(&self, errors: &[RowError]) -> String;
124}
125
126pub struct JsonRowErrorFormatter;
127pub struct CsvRowErrorFormatter;
128pub struct TextRowErrorFormatter;
129
130impl RowErrorFormatter for JsonRowErrorFormatter {
131    fn format(&self, errors: &[RowError]) -> String {
132        let json_items = errors
133            .iter()
134            .map(RowError::to_json)
135            .collect::<Vec<_>>()
136            .join(",");
137        format!("[{}]", json_items)
138    }
139}
140
141impl RowErrorFormatter for CsvRowErrorFormatter {
142    fn format(&self, errors: &[RowError]) -> String {
143        let lines = errors
144            .iter()
145            .map(|error| {
146                format!(
147                    "{},{},{}",
148                    csv_escape(&error.rule),
149                    csv_escape(&error.column),
150                    csv_escape(&error.message)
151                )
152            })
153            .collect::<Vec<_>>()
154            .join("\n");
155        json_string(&lines)
156    }
157}
158
159impl RowErrorFormatter for TextRowErrorFormatter {
160    fn format(&self, errors: &[RowError]) -> String {
161        let text = errors
162            .iter()
163            .map(|error| format!("{}:{} {}", error.rule, error.column, error.message))
164            .collect::<Vec<_>>()
165            .join("; ");
166        json_string(&text)
167    }
168}
169
170pub fn row_error_formatter(name: &str) -> FloeResult<Box<dyn RowErrorFormatter>> {
171    match name {
172        "json" => Ok(Box::new(JsonRowErrorFormatter)),
173        "csv" => Ok(Box::new(CsvRowErrorFormatter)),
174        "text" => Ok(Box::new(TextRowErrorFormatter)),
175        other => Err(Box::new(ConfigError(format!(
176            "unsupported report.formatter: {other}"
177        )))),
178    }
179}
180
181pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
182    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
183    for errors in errors_per_row {
184        accept_rows.push(errors.is_empty());
185    }
186    accept_rows
187}
188
189pub fn build_errors_json(
190    errors_per_row: &[Vec<RowError>],
191    accept_rows: &[bool],
192) -> Vec<Option<String>> {
193    build_errors_formatted(errors_per_row, accept_rows, &JsonRowErrorFormatter)
194}
195
196pub fn build_errors_formatted(
197    errors_per_row: &[Vec<RowError>],
198    accept_rows: &[bool],
199    formatter: &dyn RowErrorFormatter,
200) -> Vec<Option<String>> {
201    let mut errors_out = Vec::with_capacity(errors_per_row.len());
202    for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
203        if *accepted {
204            errors_out.push(None);
205            continue;
206        }
207        errors_out.push(Some(formatter.format(errors)));
208    }
209    errors_out
210}
211
212pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
213    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
214    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
215    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
216    (accept_mask, reject_mask)
217}
218
219pub fn rejected_error_columns(
220    errors_per_row: &[Option<String>],
221    include_all_rows: bool,
222) -> (Series, Series) {
223    if include_all_rows {
224        let mut row_index = Vec::with_capacity(errors_per_row.len());
225        let mut errors = Vec::with_capacity(errors_per_row.len());
226        for (idx, err) in errors_per_row.iter().enumerate() {
227            row_index.push(idx as u64);
228            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
229        }
230        (
231            Series::new("__floe_row_index".into(), row_index),
232            Series::new("__floe_errors".into(), errors),
233        )
234    } else {
235        let mut row_index = Vec::new();
236        let mut errors = Vec::new();
237        for (idx, err) in errors_per_row.iter().enumerate() {
238            if let Some(err) = err {
239                row_index.push(idx as u64);
240                errors.push(err.clone());
241            }
242        }
243        (
244            Series::new("__floe_row_index".into(), row_index),
245            Series::new("__floe_errors".into(), errors),
246        )
247    }
248}
249
250fn json_escape(value: &str) -> String {
251    value
252        .replace('\\', "\\\\")
253        .replace('\"', "\\\"")
254        .replace('\n', "\\n")
255        .replace('\r', "\\r")
256        .replace('\t', "\\t")
257}
258
259fn json_string(value: &str) -> String {
260    format!("\"{}\"", json_escape(value))
261}
262
263fn csv_escape(value: &str) -> String {
264    let escaped = value.replace('"', "\"\"");
265    if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
266        format!("\"{}\"", escaped)
267    } else {
268        escaped
269    }
270}