Skip to main content

floe_core/checks/
mod.rs

1mod cast;
2mod mismatch;
3mod not_null;
4mod unique;
5
6use polars::prelude::{BooleanChunked, NamedFrom, NewChunkedArray, Series};
7
8pub use cast::cast_mismatch_errors;
9pub use mismatch::{apply_schema_mismatch, MismatchOutcome};
10pub use not_null::not_null_errors;
11pub use unique::unique_errors;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct RowError {
15    pub rule: String,
16    pub column: String,
17    pub message: String,
18}
19
20impl RowError {
21    pub fn new(rule: &str, column: &str, message: &str) -> Self {
22        Self {
23            rule: rule.to_string(),
24            column: column.to_string(),
25            message: message.to_string(),
26        }
27    }
28
29    pub fn to_json(&self) -> String {
30        format!(
31            "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
32            json_escape(&self.rule),
33            json_escape(&self.column),
34            json_escape(&self.message)
35        )
36    }
37}
38
39pub fn build_error_state(errors_per_row: &[Vec<RowError>]) -> (Vec<bool>, Vec<Option<String>>) {
40    let mut accept_rows = Vec::with_capacity(errors_per_row.len());
41    let mut errors_json = Vec::with_capacity(errors_per_row.len());
42    for errors in errors_per_row {
43        if errors.is_empty() {
44            accept_rows.push(true);
45            errors_json.push(None);
46        } else {
47            accept_rows.push(false);
48            let json_items = errors
49                .iter()
50                .map(RowError::to_json)
51                .collect::<Vec<_>>()
52                .join(",");
53            errors_json.push(Some(format!("[{}]", json_items)));
54        }
55    }
56    (accept_rows, errors_json)
57}
58
59pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
60    let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
61    let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
62    let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
63    (accept_mask, reject_mask)
64}
65
66pub fn rejected_error_columns(
67    errors_per_row: &[Option<String>],
68    include_all_rows: bool,
69) -> (Series, Series) {
70    if include_all_rows {
71        let mut row_index = Vec::with_capacity(errors_per_row.len());
72        let mut errors = Vec::with_capacity(errors_per_row.len());
73        for (idx, err) in errors_per_row.iter().enumerate() {
74            row_index.push(idx as u64);
75            errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
76        }
77        (
78            Series::new("__floe_row_index".into(), row_index),
79            Series::new("__floe_errors".into(), errors),
80        )
81    } else {
82        let mut row_index = Vec::new();
83        let mut errors = Vec::new();
84        for (idx, err) in errors_per_row.iter().enumerate() {
85            if let Some(err) = err {
86                row_index.push(idx as u64);
87                errors.push(err.clone());
88            }
89        }
90        (
91            Series::new("__floe_row_index".into(), row_index),
92            Series::new("__floe_errors".into(), errors),
93        )
94    }
95}
96
97fn json_escape(value: &str) -> String {
98    value.replace('\\', "\\\\").replace('\"', "\\\"")
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use crate::config;
105    use polars::df;
106    use polars::prelude::{DataFrame, NamedFrom, Series};
107
108    #[test]
109    fn not_null_errors_flags_missing_required_values() {
110        let df = df!(
111            "customer_id" => &[Some("A"), None, Some("B")]
112        )
113        .expect("create df");
114        let required = vec!["customer_id".to_string()];
115
116        let errors = not_null_errors(&df, &required).expect("not_null_errors");
117
118        assert!(errors[0].is_empty());
119        assert_eq!(
120            errors[1],
121            vec![RowError::new(
122                "not_null",
123                "customer_id",
124                "required value missing"
125            )]
126        );
127        assert!(errors[2].is_empty());
128    }
129
130    #[test]
131    fn cast_mismatch_errors_flags_invalid_casts() {
132        let raw_df = df!(
133            "created_at" => &["2024-01-01", "bad-date"]
134        )
135        .expect("raw df");
136        let typed_series = Series::new("created_at".into(), &[Some(1i64), None]);
137        let typed_df = DataFrame::new(vec![typed_series.into()]).expect("typed df");
138
139        let columns = vec![config::ColumnConfig {
140            name: "created_at".to_string(),
141            column_type: "datetime".to_string(),
142            nullable: Some(true),
143            unique: None,
144        }];
145
146        let errors = cast_mismatch_errors(&raw_df, &typed_df, &columns).expect("cast errors");
147
148        assert!(errors[0].is_empty());
149        assert_eq!(
150            errors[1],
151            vec![RowError::new(
152                "cast_error",
153                "created_at",
154                "invalid value for target type"
155            )]
156        );
157    }
158
159    #[test]
160    fn unique_errors_flags_duplicates_after_first() {
161        let df = df!(
162            "order_id" => &[Some("o-1"), Some("o-2"), Some("o-1"), None, Some("o-2")]
163        )
164        .expect("create df");
165
166        let columns = vec![config::ColumnConfig {
167            name: "order_id".to_string(),
168            column_type: "string".to_string(),
169            nullable: Some(true),
170            unique: Some(true),
171        }];
172
173        let errors = unique_errors(&df, &columns).expect("unique errors");
174
175        assert!(errors[0].is_empty());
176        assert!(errors[1].is_empty());
177        assert_eq!(
178            errors[2],
179            vec![RowError::new("unique", "order_id", "duplicate value")]
180        );
181        assert!(errors[3].is_empty());
182        assert_eq!(
183            errors[4],
184            vec![RowError::new("unique", "order_id", "duplicate value")]
185        );
186    }
187
188    #[test]
189    fn build_error_state_builds_masks() {
190        let errors = vec![
191            vec![],
192            vec![RowError::new(
193                "not_null",
194                "customer_id",
195                "required value missing",
196            )],
197        ];
198        let (accept_rows, errors_json) = build_error_state(&errors);
199
200        assert_eq!(accept_rows, vec![true, false]);
201        assert!(errors_json[0].is_none());
202        assert!(errors_json[1].as_ref().unwrap().contains("not_null"));
203    }
204}