1mod cast;
2mod not_null;
3mod unique;
4
5use polars::prelude::{BooleanChunked, NamedFrom, NewChunkedArray, Series};
6
7pub use cast::cast_mismatch_errors;
8pub use not_null::not_null_errors;
9pub use unique::unique_errors;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct RowError {
13 pub rule: String,
14 pub column: String,
15 pub message: String,
16}
17
18impl RowError {
19 pub fn new(rule: &str, column: &str, message: &str) -> Self {
20 Self {
21 rule: rule.to_string(),
22 column: column.to_string(),
23 message: message.to_string(),
24 }
25 }
26
27 pub fn to_json(&self) -> String {
28 format!(
29 "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
30 json_escape(&self.rule),
31 json_escape(&self.column),
32 json_escape(&self.message)
33 )
34 }
35}
36
37pub fn build_error_state(errors_per_row: &[Vec<RowError>]) -> (Vec<bool>, Vec<Option<String>>) {
38 let mut accept_rows = Vec::with_capacity(errors_per_row.len());
39 let mut errors_json = Vec::with_capacity(errors_per_row.len());
40 for errors in errors_per_row {
41 if errors.is_empty() {
42 accept_rows.push(true);
43 errors_json.push(None);
44 } else {
45 accept_rows.push(false);
46 let json_items = errors
47 .iter()
48 .map(RowError::to_json)
49 .collect::<Vec<_>>()
50 .join(",");
51 errors_json.push(Some(format!("[{}]", json_items)));
52 }
53 }
54 (accept_rows, errors_json)
55}
56
57pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
58 let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
59 let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
60 let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
61 (accept_mask, reject_mask)
62}
63
64pub fn rejected_error_columns(
65 errors_per_row: &[Option<String>],
66 include_all_rows: bool,
67) -> (Series, Series) {
68 if include_all_rows {
69 let mut row_index = Vec::with_capacity(errors_per_row.len());
70 let mut errors = Vec::with_capacity(errors_per_row.len());
71 for (idx, err) in errors_per_row.iter().enumerate() {
72 row_index.push(idx as u64);
73 errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
74 }
75 (
76 Series::new("__floe_row_index".into(), row_index),
77 Series::new("__floe_errors".into(), errors),
78 )
79 } else {
80 let mut row_index = Vec::new();
81 let mut errors = Vec::new();
82 for (idx, err) in errors_per_row.iter().enumerate() {
83 if let Some(err) = err {
84 row_index.push(idx as u64);
85 errors.push(err.clone());
86 }
87 }
88 (
89 Series::new("__floe_row_index".into(), row_index),
90 Series::new("__floe_errors".into(), errors),
91 )
92 }
93}
94
95fn json_escape(value: &str) -> String {
96 value.replace('\\', "\\\\").replace('\"', "\\\"")
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use crate::config;
103 use polars::df;
104 use polars::prelude::{DataFrame, NamedFrom, Series};
105
106 #[test]
107 fn not_null_errors_flags_missing_required_values() {
108 let df = df!(
109 "customer_id" => &[Some("A"), None, Some("B")]
110 )
111 .expect("create df");
112 let required = vec!["customer_id".to_string()];
113
114 let errors = not_null_errors(&df, &required).expect("not_null_errors");
115
116 assert!(errors[0].is_empty());
117 assert_eq!(
118 errors[1],
119 vec![RowError::new(
120 "not_null",
121 "customer_id",
122 "required value missing"
123 )]
124 );
125 assert!(errors[2].is_empty());
126 }
127
128 #[test]
129 fn cast_mismatch_errors_flags_invalid_casts() {
130 let raw_df = df!(
131 "created_at" => &["2024-01-01", "bad-date"]
132 )
133 .expect("raw df");
134 let typed_series = Series::new("created_at".into(), &[Some(1i64), None]);
135 let typed_df = DataFrame::new(vec![typed_series.into()]).expect("typed df");
136
137 let columns = vec![config::ColumnConfig {
138 name: "created_at".to_string(),
139 column_type: "datetime".to_string(),
140 nullable: Some(true),
141 unique: None,
142 }];
143
144 let errors = cast_mismatch_errors(&raw_df, &typed_df, &columns).expect("cast errors");
145
146 assert!(errors[0].is_empty());
147 assert_eq!(
148 errors[1],
149 vec![RowError::new(
150 "cast_error",
151 "created_at",
152 "invalid value for target type"
153 )]
154 );
155 }
156
157 #[test]
158 fn unique_errors_flags_duplicates_after_first() {
159 let df = df!(
160 "order_id" => &[Some("o-1"), Some("o-2"), Some("o-1"), None, Some("o-2")]
161 )
162 .expect("create df");
163
164 let columns = vec![config::ColumnConfig {
165 name: "order_id".to_string(),
166 column_type: "string".to_string(),
167 nullable: Some(true),
168 unique: Some(true),
169 }];
170
171 let errors = unique_errors(&df, &columns).expect("unique errors");
172
173 assert!(errors[0].is_empty());
174 assert!(errors[1].is_empty());
175 assert_eq!(
176 errors[2],
177 vec![RowError::new("unique", "order_id", "duplicate value")]
178 );
179 assert!(errors[3].is_empty());
180 assert_eq!(
181 errors[4],
182 vec![RowError::new("unique", "order_id", "duplicate value")]
183 );
184 }
185
186 #[test]
187 fn build_error_state_builds_masks() {
188 let errors = vec![
189 vec![],
190 vec![RowError::new(
191 "not_null",
192 "customer_id",
193 "required value missing",
194 )],
195 ];
196 let (accept_rows, errors_json) = build_error_state(&errors);
197
198 assert_eq!(accept_rows, vec![true, false]);
199 assert!(errors_json[0].is_none());
200 assert!(errors_json[1].as_ref().unwrap().contains("not_null"));
201 }
202}