1mod cast;
2mod not_null;
3mod unique;
4
5use polars::prelude::{BooleanChunked, NamedFrom, NewChunkedArray, Series};
6
7pub use cast::cast_mismatch_errors;
8pub use not_null::not_null_errors;
9pub use unique::unique_errors;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct RowError {
13 pub rule: String,
14 pub column: String,
15 pub message: String,
16}
17
18impl RowError {
19 pub fn new(rule: &str, column: &str, message: &str) -> Self {
20 Self {
21 rule: rule.to_string(),
22 column: column.to_string(),
23 message: message.to_string(),
24 }
25 }
26
27 pub fn to_json(&self) -> String {
28 format!(
29 "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
30 json_escape(&self.rule),
31 json_escape(&self.column),
32 json_escape(&self.message)
33 )
34 }
35}
36
37pub fn build_error_state(
38 errors_per_row: &[Vec<RowError>],
39) -> (Vec<bool>, Vec<Option<String>>) {
40 let mut accept_rows = Vec::with_capacity(errors_per_row.len());
41 let mut errors_json = Vec::with_capacity(errors_per_row.len());
42 for errors in errors_per_row {
43 if errors.is_empty() {
44 accept_rows.push(true);
45 errors_json.push(None);
46 } else {
47 accept_rows.push(false);
48 let json_items = errors
49 .iter()
50 .map(RowError::to_json)
51 .collect::<Vec<_>>()
52 .join(",");
53 errors_json.push(Some(format!("[{}]", json_items)));
54 }
55 }
56 (accept_rows, errors_json)
57}
58
59pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
60 let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
61 let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
62 let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
63 (accept_mask, reject_mask)
64}
65
66pub fn rejected_error_columns(
67 errors_per_row: &[Option<String>],
68 include_all_rows: bool,
69) -> (Series, Series) {
70 if include_all_rows {
71 let mut row_index = Vec::with_capacity(errors_per_row.len());
72 let mut errors = Vec::with_capacity(errors_per_row.len());
73 for (idx, err) in errors_per_row.iter().enumerate() {
74 row_index.push(idx as u64);
75 errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
76 }
77 (
78 Series::new("__floe_row_index".into(), row_index),
79 Series::new("__floe_errors".into(), errors),
80 )
81 } else {
82 let mut row_index = Vec::new();
83 let mut errors = Vec::new();
84 for (idx, err) in errors_per_row.iter().enumerate() {
85 if let Some(err) = err {
86 row_index.push(idx as u64);
87 errors.push(err.clone());
88 }
89 }
90 (
91 Series::new("__floe_row_index".into(), row_index),
92 Series::new("__floe_errors".into(), errors),
93 )
94 }
95}
96
97fn json_escape(value: &str) -> String {
98 value.replace('\\', "\\\\").replace('\"', "\\\"")
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use crate::config;
105 use polars::prelude::{DataFrame, NamedFrom, Series};
106 use polars::df;
107
108 #[test]
109 fn not_null_errors_flags_missing_required_values() {
110 let df = df!(
111 "customer_id" => &[Some("A"), None, Some("B")]
112 )
113 .expect("create df");
114 let required = vec!["customer_id".to_string()];
115
116 let errors = not_null_errors(&df, &required).expect("not_null_errors");
117
118 assert!(errors[0].is_empty());
119 assert_eq!(
120 errors[1],
121 vec![RowError::new(
122 "not_null",
123 "customer_id",
124 "required value missing"
125 )]
126 );
127 assert!(errors[2].is_empty());
128 }
129
130 #[test]
131 fn cast_mismatch_errors_flags_invalid_casts() {
132 let raw_df = df!(
133 "created_at" => &["2024-01-01", "bad-date"]
134 )
135 .expect("raw df");
136 let typed_series = Series::new("created_at".into(), &[Some(1i64), None]);
137 let typed_df = DataFrame::new(vec![typed_series.into()]).expect("typed df");
138
139 let columns = vec![config::ColumnConfig {
140 name: "created_at".to_string(),
141 column_type: "datetime".to_string(),
142 nullable: Some(true),
143 unique: None,
144 }];
145
146 let errors = cast_mismatch_errors(&raw_df, &typed_df, &columns).expect("cast errors");
147
148 assert!(errors[0].is_empty());
149 assert_eq!(
150 errors[1],
151 vec![RowError::new(
152 "cast_error",
153 "created_at",
154 "invalid value for target type"
155 )]
156 );
157 }
158
159 #[test]
160 fn unique_errors_flags_duplicates_after_first() {
161 let df = df!(
162 "order_id" => &[Some("o-1"), Some("o-2"), Some("o-1"), None, Some("o-2")]
163 )
164 .expect("create df");
165
166 let columns = vec![config::ColumnConfig {
167 name: "order_id".to_string(),
168 column_type: "string".to_string(),
169 nullable: Some(true),
170 unique: Some(true),
171 }];
172
173 let errors = unique_errors(&df, &columns).expect("unique errors");
174
175 assert!(errors[0].is_empty());
176 assert!(errors[1].is_empty());
177 assert_eq!(
178 errors[2],
179 vec![RowError::new("unique", "order_id", "duplicate value")]
180 );
181 assert!(errors[3].is_empty());
182 assert_eq!(
183 errors[4],
184 vec![RowError::new("unique", "order_id", "duplicate value")]
185 );
186 }
187
188 #[test]
189 fn build_error_state_builds_masks() {
190 let errors = vec![
191 vec![],
192 vec![RowError::new("not_null", "customer_id", "required value missing")],
193 ];
194 let (accept_rows, errors_json) = build_error_state(&errors);
195
196 assert_eq!(accept_rows, vec![true, false]);
197 assert!(errors_json[0].is_none());
198 assert!(errors_json[1].as_ref().unwrap().contains("not_null"));
199 }
200}