floe_core/checks/
mod.rs

1mod cast;
2mod not_null;
3mod unique;
4
5use polars::prelude::{BooleanChunked, NamedFrom, NewChunkedArray, Series};
6
7pub use cast::cast_mismatch_errors;
8pub use not_null::not_null_errors;
9pub use unique::unique_errors;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct RowError {
13    pub rule: String,
14    pub column: String,
15    pub message: String,
16}
17
18impl RowError {
19    pub fn new(rule: &str, column: &str, message: &str) -> Self {
20        Self {
21            rule: rule.to_string(),
22            column: column.to_string(),
23            message: message.to_string(),
24        }
25    }
26
27    pub fn to_json(&self) -> String {
28        format!(
29            "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
30            json_escape(&self.rule),
31            json_escape(&self.column),
32            json_escape(&self.message)
33        )
34    }
35}
36
37pub fn build_error_state(errors_per_row: &[Vec<RowError>]) -> (Vec<bool>, Vec<Option<String>>) {
38    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
39    let mut errors_json = Vec::with_capacity(errors_per_row.len());
40    for errors in errors_per_row {
41        if errors.is_empty() {
42            accept_rows.push(true);
43            errors_json.push(None);
44        } else {
45            accept_rows.push(false);
46            let json_items = errors
47                .iter()
48                .map(RowError::to_json)
49                .collect::<Vec<_>>()
50                .join(",");
51            errors_json.push(Some(format!("[{}]", json_items)));
52        }
53    }
54    (accept_rows, errors_json)
55}
56
57pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
58    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
59    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
60    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
61    (accept_mask, reject_mask)
62}
63
64pub fn rejected_error_columns(
65    errors_per_row: &[Option<String>],
66    include_all_rows: bool,
67) -> (Series, Series) {
68    if include_all_rows {
69        let mut row_index = Vec::with_capacity(errors_per_row.len());
70        let mut errors = Vec::with_capacity(errors_per_row.len());
71        for (idx, err) in errors_per_row.iter().enumerate() {
72            row_index.push(idx as u64);
73            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
74        }
75        (
76            Series::new("__floe_row_index".into(), row_index),
77            Series::new("__floe_errors".into(), errors),
78        )
79    } else {
80        let mut row_index = Vec::new();
81        let mut errors = Vec::new();
82        for (idx, err) in errors_per_row.iter().enumerate() {
83            if let Some(err) = err {
84                row_index.push(idx as u64);
85                errors.push(err.clone());
86            }
87        }
88        (
89            Series::new("__floe_row_index".into(), row_index),
90            Series::new("__floe_errors".into(), errors),
91        )
92    }
93}
94
95fn json_escape(value: &str) -> String {
96    value.replace('\\', "\\\\").replace('\"', "\\\"")
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use crate::config;
103    use polars::df;
104    use polars::prelude::{DataFrame, NamedFrom, Series};
105
106    #[test]
107    fn not_null_errors_flags_missing_required_values() {
108        let df = df!(
109            "customer_id" => &[Some("A"), None, Some("B")]
110        )
111        .expect("create df");
112        let required = vec!["customer_id".to_string()];
113
114        let errors = not_null_errors(&df, &required).expect("not_null_errors");
115
116        assert!(errors[0].is_empty());
117        assert_eq!(
118            errors[1],
119            vec![RowError::new(
120                "not_null",
121                "customer_id",
122                "required value missing"
123            )]
124        );
125        assert!(errors[2].is_empty());
126    }
127
128    #[test]
129    fn cast_mismatch_errors_flags_invalid_casts() {
130        let raw_df = df!(
131            "created_at" => &["2024-01-01", "bad-date"]
132        )
133        .expect("raw df");
134        let typed_series = Series::new("created_at".into(), &[Some(1i64), None]);
135        let typed_df = DataFrame::new(vec![typed_series.into()]).expect("typed df");
136
137        let columns = vec![config::ColumnConfig {
138            name: "created_at".to_string(),
139            column_type: "datetime".to_string(),
140            nullable: Some(true),
141            unique: None,
142        }];
143
144        let errors = cast_mismatch_errors(&raw_df, &typed_df, &columns).expect("cast errors");
145
146        assert!(errors[0].is_empty());
147        assert_eq!(
148            errors[1],
149            vec![RowError::new(
150                "cast_error",
151                "created_at",
152                "invalid value for target type"
153            )]
154        );
155    }
156
157    #[test]
158    fn unique_errors_flags_duplicates_after_first() {
159        let df = df!(
160            "order_id" => &[Some("o-1"), Some("o-2"), Some("o-1"), None, Some("o-2")]
161        )
162        .expect("create df");
163
164        let columns = vec![config::ColumnConfig {
165            name: "order_id".to_string(),
166            column_type: "string".to_string(),
167            nullable: Some(true),
168            unique: Some(true),
169        }];
170
171        let errors = unique_errors(&df, &columns).expect("unique errors");
172
173        assert!(errors[0].is_empty());
174        assert!(errors[1].is_empty());
175        assert_eq!(
176            errors[2],
177            vec![RowError::new("unique", "order_id", "duplicate value")]
178        );
179        assert!(errors[3].is_empty());
180        assert_eq!(
181            errors[4],
182            vec![RowError::new("unique", "order_id", "duplicate value")]
183        );
184    }
185
186    #[test]
187    fn build_error_state_builds_masks() {
188        let errors = vec![
189            vec![],
190            vec![RowError::new(
191                "not_null",
192                "customer_id",
193                "required value missing",
194            )],
195        ];
196        let (accept_rows, errors_json) = build_error_state(&errors);
197
198        assert_eq!(accept_rows, vec![true, false]);
199        assert!(errors_json[0].is_none());
200        assert!(errors_json[1].as_ref().unwrap().contains("not_null"));
201    }
202}