Skip to main content

floe_core/checks/
mod.rs

1mod cast;
2mod mismatch;
3mod not_null;
4mod unique;
5
6use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
7use std::collections::HashMap;
8
9use crate::{ConfigError, FloeResult};
10
11pub use cast::{cast_mismatch_counts, cast_mismatch_errors};
12pub use mismatch::{apply_schema_mismatch, MismatchOutcome};
13pub use not_null::{not_null_counts, not_null_errors};
14pub use unique::{unique_counts, unique_errors};
15
16pub type ColumnIndex = HashMap<String, usize>;
17
18pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
19    df.get_column_names()
20        .iter()
21        .enumerate()
22        .map(|(idx, name)| (name.to_string(), idx))
23        .collect()
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct RowError {
28    pub rule: String,
29    pub column: String,
30    pub message: String,
31}
32
33impl RowError {
34    pub fn new(rule: &str, column: &str, message: &str) -> Self {
35        Self {
36            rule: rule.to_string(),
37            column: column.to_string(),
38            message: message.to_string(),
39        }
40    }
41
42    pub fn to_json(&self) -> String {
43        format!(
44            "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
45            json_escape(&self.rule),
46            json_escape(&self.column),
47            json_escape(&self.message)
48        )
49    }
50}
51
52pub trait RowErrorFormatter {
53    fn format(&self, errors: &[RowError]) -> String;
54}
55
56pub struct JsonRowErrorFormatter;
57pub struct CsvRowErrorFormatter;
58pub struct TextRowErrorFormatter;
59
60impl RowErrorFormatter for JsonRowErrorFormatter {
61    fn format(&self, errors: &[RowError]) -> String {
62        let json_items = errors
63            .iter()
64            .map(RowError::to_json)
65            .collect::<Vec<_>>()
66            .join(",");
67        format!("[{}]", json_items)
68    }
69}
70
71impl RowErrorFormatter for CsvRowErrorFormatter {
72    fn format(&self, errors: &[RowError]) -> String {
73        let lines = errors
74            .iter()
75            .map(|error| {
76                format!(
77                    "{},{},{}",
78                    csv_escape(&error.rule),
79                    csv_escape(&error.column),
80                    csv_escape(&error.message)
81                )
82            })
83            .collect::<Vec<_>>()
84            .join("\n");
85        json_string(&lines)
86    }
87}
88
89impl RowErrorFormatter for TextRowErrorFormatter {
90    fn format(&self, errors: &[RowError]) -> String {
91        let text = errors
92            .iter()
93            .map(|error| format!("{}:{} {}", error.rule, error.column, error.message))
94            .collect::<Vec<_>>()
95            .join("; ");
96        json_string(&text)
97    }
98}
99
100pub fn row_error_formatter(name: &str) -> FloeResult<Box<dyn RowErrorFormatter>> {
101    match name {
102        "json" => Ok(Box::new(JsonRowErrorFormatter)),
103        "csv" => Ok(Box::new(CsvRowErrorFormatter)),
104        "text" => Ok(Box::new(TextRowErrorFormatter)),
105        other => Err(Box::new(ConfigError(format!(
106            "unsupported report.formatter: {other}"
107        )))),
108    }
109}
110
111pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
112    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
113    for errors in errors_per_row {
114        accept_rows.push(errors.is_empty());
115    }
116    accept_rows
117}
118
119pub fn build_errors_json(
120    errors_per_row: &[Vec<RowError>],
121    accept_rows: &[bool],
122) -> Vec<Option<String>> {
123    build_errors_formatted(errors_per_row, accept_rows, &JsonRowErrorFormatter)
124}
125
126pub fn build_errors_formatted(
127    errors_per_row: &[Vec<RowError>],
128    accept_rows: &[bool],
129    formatter: &dyn RowErrorFormatter,
130) -> Vec<Option<String>> {
131    let mut errors_out = Vec::with_capacity(errors_per_row.len());
132    for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
133        if *accepted {
134            errors_out.push(None);
135            continue;
136        }
137        errors_out.push(Some(formatter.format(errors)));
138    }
139    errors_out
140}
141
142pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
143    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
144    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
145    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
146    (accept_mask, reject_mask)
147}
148
149pub fn rejected_error_columns(
150    errors_per_row: &[Option<String>],
151    include_all_rows: bool,
152) -> (Series, Series) {
153    if include_all_rows {
154        let mut row_index = Vec::with_capacity(errors_per_row.len());
155        let mut errors = Vec::with_capacity(errors_per_row.len());
156        for (idx, err) in errors_per_row.iter().enumerate() {
157            row_index.push(idx as u64);
158            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
159        }
160        (
161            Series::new("__floe_row_index".into(), row_index),
162            Series::new("__floe_errors".into(), errors),
163        )
164    } else {
165        let mut row_index = Vec::new();
166        let mut errors = Vec::new();
167        for (idx, err) in errors_per_row.iter().enumerate() {
168            if let Some(err) = err {
169                row_index.push(idx as u64);
170                errors.push(err.clone());
171            }
172        }
173        (
174            Series::new("__floe_row_index".into(), row_index),
175            Series::new("__floe_errors".into(), errors),
176        )
177    }
178}
179
180fn json_escape(value: &str) -> String {
181    value
182        .replace('\\', "\\\\")
183        .replace('\"', "\\\"")
184        .replace('\n', "\\n")
185        .replace('\r', "\\r")
186        .replace('\t', "\\t")
187}
188
189fn json_string(value: &str) -> String {
190    format!("\"{}\"", json_escape(value))
191}
192
193fn csv_escape(value: &str) -> String {
194    let escaped = value.replace('"', "\"\"");
195    if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
196        format!("\"{}\"", escaped)
197    } else {
198        escaped
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::config;
206    use polars::df;
207    use polars::prelude::{DataFrame, NamedFrom, Series};
208
209    #[test]
210    fn not_null_errors_flags_missing_required_values() {
211        let df = df!(
212            "customer_id" => &[Some("A"), None, Some("B")]
213        )
214        .expect("create df");
215        let required = vec!["customer_id".to_string()];
216
217        let indices = column_index_map(&df);
218        let errors = not_null_errors(&df, &required, &indices).expect("not_null_errors");
219
220        assert!(errors[0].is_empty());
221        assert_eq!(
222            errors[1],
223            vec![RowError::new(
224                "not_null",
225                "customer_id",
226                "required value missing"
227            )]
228        );
229        assert!(errors[2].is_empty());
230    }
231
232    #[test]
233    fn cast_mismatch_errors_flags_invalid_casts() {
234        let raw_df = df!(
235            "created_at" => &["2024-01-01", "bad-date"]
236        )
237        .expect("raw df");
238        let typed_series = Series::new("created_at".into(), &[Some(1i64), None]);
239        let typed_df = DataFrame::new(vec![typed_series.into()]).expect("typed df");
240
241        let columns = vec![config::ColumnConfig {
242            name: "created_at".to_string(),
243            column_type: "datetime".to_string(),
244            nullable: Some(true),
245            unique: None,
246        }];
247
248        let raw_indices = column_index_map(&raw_df);
249        let typed_indices = column_index_map(&typed_df);
250        let errors =
251            cast_mismatch_errors(&raw_df, &typed_df, &columns, &raw_indices, &typed_indices)
252                .expect("cast errors");
253
254        assert!(errors[0].is_empty());
255        assert_eq!(
256            errors[1],
257            vec![RowError::new(
258                "cast_error",
259                "created_at",
260                "invalid value for target type"
261            )]
262        );
263    }
264
265    #[test]
266    fn unique_errors_flags_duplicates_after_first() {
267        let df = df!(
268            "order_id" => &[Some("o-1"), Some("o-2"), Some("o-1"), None, Some("o-2")]
269        )
270        .expect("create df");
271
272        let columns = vec![config::ColumnConfig {
273            name: "order_id".to_string(),
274            column_type: "string".to_string(),
275            nullable: Some(true),
276            unique: Some(true),
277        }];
278
279        let indices = column_index_map(&df);
280        let errors = unique_errors(&df, &columns, &indices).expect("unique errors");
281
282        assert!(errors[0].is_empty());
283        assert!(errors[1].is_empty());
284        assert_eq!(
285            errors[2],
286            vec![RowError::new("unique", "order_id", "duplicate value")]
287        );
288        assert!(errors[3].is_empty());
289        assert_eq!(
290            errors[4],
291            vec![RowError::new("unique", "order_id", "duplicate value")]
292        );
293    }
294
295    #[test]
296    fn build_error_state_builds_masks() {
297        let errors = vec![
298            vec![],
299            vec![RowError::new(
300                "not_null",
301                "customer_id",
302                "required value missing",
303            )],
304        ];
305        let accept_rows = build_accept_rows(&errors);
306        let errors_json = build_errors_json(&errors, &accept_rows);
307
308        assert_eq!(accept_rows, vec![true, false]);
309        assert!(errors_json[0].is_none());
310        assert!(errors_json[1].as_ref().unwrap().contains("not_null"));
311    }
312
313    #[test]
314    fn csv_formatter_outputs_json_string() {
315        let errors = vec![
316            RowError::new("not_null", "customer_id", "missing"),
317            RowError::new("unique", "order_id", "duplicate"),
318        ];
319        let formatted = CsvRowErrorFormatter.format(&errors);
320        assert_eq!(
321            formatted,
322            "\"not_null,customer_id,missing\\nunique,order_id,duplicate\""
323        );
324    }
325
326    #[test]
327    fn text_formatter_outputs_json_string() {
328        let errors = vec![RowError::new("unique", "order_id", "duplicate")];
329        let formatted = TextRowErrorFormatter.format(&errors);
330        assert_eq!(formatted, "\"unique:order_id duplicate\"");
331    }
332}