floe_core/checks/
mod.rs

1mod cast;
2mod not_null;
3mod unique;
4
5use polars::prelude::{BooleanChunked, NamedFrom, NewChunkedArray, Series};
6
7pub use cast::cast_mismatch_errors;
8pub use not_null::not_null_errors;
9pub use unique::unique_errors;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct RowError {
13    pub rule: String,
14    pub column: String,
15    pub message: String,
16}
17
18impl RowError {
19    pub fn new(rule: &str, column: &str, message: &str) -> Self {
20        Self {
21            rule: rule.to_string(),
22            column: column.to_string(),
23            message: message.to_string(),
24        }
25    }
26
27    pub fn to_json(&self) -> String {
28        format!(
29            "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
30            json_escape(&self.rule),
31            json_escape(&self.column),
32            json_escape(&self.message)
33        )
34    }
35}
36
37pub fn build_error_state(
38    errors_per_row: &[Vec<RowError>],
39) -> (Vec<bool>, Vec<Option<String>>) {
40    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
41    let mut errors_json = Vec::with_capacity(errors_per_row.len());
42    for errors in errors_per_row {
43        if errors.is_empty() {
44            accept_rows.push(true);
45            errors_json.push(None);
46        } else {
47            accept_rows.push(false);
48            let json_items = errors
49                .iter()
50                .map(RowError::to_json)
51                .collect::<Vec<_>>()
52                .join(",");
53            errors_json.push(Some(format!("[{}]", json_items)));
54        }
55    }
56    (accept_rows, errors_json)
57}
58
59pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
60    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
61    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
62    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
63    (accept_mask, reject_mask)
64}
65
66pub fn rejected_error_columns(
67    errors_per_row: &[Option<String>],
68    include_all_rows: bool,
69) -> (Series, Series) {
70    if include_all_rows {
71        let mut row_index = Vec::with_capacity(errors_per_row.len());
72        let mut errors = Vec::with_capacity(errors_per_row.len());
73        for (idx, err) in errors_per_row.iter().enumerate() {
74            row_index.push(idx as u64);
75            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
76        }
77        (
78            Series::new("__floe_row_index".into(), row_index),
79            Series::new("__floe_errors".into(), errors),
80        )
81    } else {
82        let mut row_index = Vec::new();
83        let mut errors = Vec::new();
84        for (idx, err) in errors_per_row.iter().enumerate() {
85            if let Some(err) = err {
86                row_index.push(idx as u64);
87                errors.push(err.clone());
88            }
89        }
90        (
91            Series::new("__floe_row_index".into(), row_index),
92            Series::new("__floe_errors".into(), errors),
93        )
94    }
95}
96
97fn json_escape(value: &str) -> String {
98    value.replace('\\', "\\\\").replace('\"', "\\\"")
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use crate::config;
105    use polars::prelude::{DataFrame, NamedFrom, Series};
106    use polars::df;
107
108    #[test]
109    fn not_null_errors_flags_missing_required_values() {
110        let df = df!(
111            "customer_id" => &[Some("A"), None, Some("B")]
112        )
113        .expect("create df");
114        let required = vec!["customer_id".to_string()];
115
116        let errors = not_null_errors(&df, &required).expect("not_null_errors");
117
118        assert!(errors[0].is_empty());
119        assert_eq!(
120            errors[1],
121            vec![RowError::new(
122                "not_null",
123                "customer_id",
124                "required value missing"
125            )]
126        );
127        assert!(errors[2].is_empty());
128    }
129
130    #[test]
131    fn cast_mismatch_errors_flags_invalid_casts() {
132        let raw_df = df!(
133            "created_at" => &["2024-01-01", "bad-date"]
134        )
135        .expect("raw df");
136        let typed_series = Series::new("created_at".into(), &[Some(1i64), None]);
137        let typed_df = DataFrame::new(vec![typed_series.into()]).expect("typed df");
138
139        let columns = vec![config::ColumnConfig {
140            name: "created_at".to_string(),
141            column_type: "datetime".to_string(),
142            nullable: Some(true),
143            unique: None,
144        }];
145
146        let errors = cast_mismatch_errors(&raw_df, &typed_df, &columns).expect("cast errors");
147
148        assert!(errors[0].is_empty());
149        assert_eq!(
150            errors[1],
151            vec![RowError::new(
152                "cast_error",
153                "created_at",
154                "invalid value for target type"
155            )]
156        );
157    }
158
159    #[test]
160    fn unique_errors_flags_duplicates_after_first() {
161        let df = df!(
162            "order_id" => &[Some("o-1"), Some("o-2"), Some("o-1"), None, Some("o-2")]
163        )
164        .expect("create df");
165
166        let columns = vec![config::ColumnConfig {
167            name: "order_id".to_string(),
168            column_type: "string".to_string(),
169            nullable: Some(true),
170            unique: Some(true),
171        }];
172
173        let errors = unique_errors(&df, &columns).expect("unique errors");
174
175        assert!(errors[0].is_empty());
176        assert!(errors[1].is_empty());
177        assert_eq!(
178            errors[2],
179            vec![RowError::new("unique", "order_id", "duplicate value")]
180        );
181        assert!(errors[3].is_empty());
182        assert_eq!(
183            errors[4],
184            vec![RowError::new("unique", "order_id", "duplicate value")]
185        );
186    }
187
188    #[test]
189    fn build_error_state_builds_masks() {
190        let errors = vec![
191            vec![],
192            vec![RowError::new("not_null", "customer_id", "required value missing")],
193        ];
194        let (accept_rows, errors_json) = build_error_state(&errors);
195
196        assert_eq!(accept_rows, vec![true, false]);
197        assert!(errors_json[0].is_none());
198        assert!(errors_json[1].as_ref().unwrap().contains("not_null"));
199    }
200}