1mod cast;
2mod mismatch;
3mod not_null;
4mod unique;
5
6use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
7use std::collections::HashMap;
8
9use crate::{ConfigError, FloeResult};
10
11pub use cast::{cast_mismatch_counts, cast_mismatch_errors};
12pub use mismatch::{apply_schema_mismatch, MismatchOutcome};
13pub use not_null::{not_null_counts, not_null_errors};
14pub use unique::{unique_counts, unique_errors};
15
16pub type ColumnIndex = HashMap<String, usize>;
17
18pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
19 df.get_column_names()
20 .iter()
21 .enumerate()
22 .map(|(idx, name)| (name.to_string(), idx))
23 .collect()
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct RowError {
28 pub rule: String,
29 pub column: String,
30 pub message: String,
31}
32
33impl RowError {
34 pub fn new(rule: &str, column: &str, message: &str) -> Self {
35 Self {
36 rule: rule.to_string(),
37 column: column.to_string(),
38 message: message.to_string(),
39 }
40 }
41
42 pub fn to_json(&self) -> String {
43 format!(
44 "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
45 json_escape(&self.rule),
46 json_escape(&self.column),
47 json_escape(&self.message)
48 )
49 }
50}
51
52pub trait RowErrorFormatter {
53 fn format(&self, errors: &[RowError]) -> String;
54}
55
56pub struct JsonRowErrorFormatter;
57pub struct CsvRowErrorFormatter;
58pub struct TextRowErrorFormatter;
59
60impl RowErrorFormatter for JsonRowErrorFormatter {
61 fn format(&self, errors: &[RowError]) -> String {
62 let json_items = errors
63 .iter()
64 .map(RowError::to_json)
65 .collect::<Vec<_>>()
66 .join(",");
67 format!("[{}]", json_items)
68 }
69}
70
71impl RowErrorFormatter for CsvRowErrorFormatter {
72 fn format(&self, errors: &[RowError]) -> String {
73 let lines = errors
74 .iter()
75 .map(|error| {
76 format!(
77 "{},{},{}",
78 csv_escape(&error.rule),
79 csv_escape(&error.column),
80 csv_escape(&error.message)
81 )
82 })
83 .collect::<Vec<_>>()
84 .join("\n");
85 json_string(&lines)
86 }
87}
88
89impl RowErrorFormatter for TextRowErrorFormatter {
90 fn format(&self, errors: &[RowError]) -> String {
91 let text = errors
92 .iter()
93 .map(|error| format!("{}:{} {}", error.rule, error.column, error.message))
94 .collect::<Vec<_>>()
95 .join("; ");
96 json_string(&text)
97 }
98}
99
100pub fn row_error_formatter(name: &str) -> FloeResult<Box<dyn RowErrorFormatter>> {
101 match name {
102 "json" => Ok(Box::new(JsonRowErrorFormatter)),
103 "csv" => Ok(Box::new(CsvRowErrorFormatter)),
104 "text" => Ok(Box::new(TextRowErrorFormatter)),
105 other => Err(Box::new(ConfigError(format!(
106 "unsupported report.formatter: {other}"
107 )))),
108 }
109}
110
111pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
112 let mut accept_rows = Vec::with_capacity(errors_per_row.len());
113 for errors in errors_per_row {
114 accept_rows.push(errors.is_empty());
115 }
116 accept_rows
117}
118
119pub fn build_errors_json(
120 errors_per_row: &[Vec<RowError>],
121 accept_rows: &[bool],
122) -> Vec<Option<String>> {
123 build_errors_formatted(errors_per_row, accept_rows, &JsonRowErrorFormatter)
124}
125
126pub fn build_errors_formatted(
127 errors_per_row: &[Vec<RowError>],
128 accept_rows: &[bool],
129 formatter: &dyn RowErrorFormatter,
130) -> Vec<Option<String>> {
131 let mut errors_out = Vec::with_capacity(errors_per_row.len());
132 for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
133 if *accepted {
134 errors_out.push(None);
135 continue;
136 }
137 errors_out.push(Some(formatter.format(errors)));
138 }
139 errors_out
140}
141
142pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
143 let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
144 let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
145 let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
146 (accept_mask, reject_mask)
147}
148
149pub fn rejected_error_columns(
150 errors_per_row: &[Option<String>],
151 include_all_rows: bool,
152) -> (Series, Series) {
153 if include_all_rows {
154 let mut row_index = Vec::with_capacity(errors_per_row.len());
155 let mut errors = Vec::with_capacity(errors_per_row.len());
156 for (idx, err) in errors_per_row.iter().enumerate() {
157 row_index.push(idx as u64);
158 errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
159 }
160 (
161 Series::new("__floe_row_index".into(), row_index),
162 Series::new("__floe_errors".into(), errors),
163 )
164 } else {
165 let mut row_index = Vec::new();
166 let mut errors = Vec::new();
167 for (idx, err) in errors_per_row.iter().enumerate() {
168 if let Some(err) = err {
169 row_index.push(idx as u64);
170 errors.push(err.clone());
171 }
172 }
173 (
174 Series::new("__floe_row_index".into(), row_index),
175 Series::new("__floe_errors".into(), errors),
176 )
177 }
178}
179
180fn json_escape(value: &str) -> String {
181 value
182 .replace('\\', "\\\\")
183 .replace('\"', "\\\"")
184 .replace('\n', "\\n")
185 .replace('\r', "\\r")
186 .replace('\t', "\\t")
187}
188
189fn json_string(value: &str) -> String {
190 format!("\"{}\"", json_escape(value))
191}
192
193fn csv_escape(value: &str) -> String {
194 let escaped = value.replace('"', "\"\"");
195 if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
196 format!("\"{}\"", escaped)
197 } else {
198 escaped
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::config;
206 use polars::df;
207 use polars::prelude::{DataFrame, NamedFrom, Series};
208
209 #[test]
210 fn not_null_errors_flags_missing_required_values() {
211 let df = df!(
212 "customer_id" => &[Some("A"), None, Some("B")]
213 )
214 .expect("create df");
215 let required = vec!["customer_id".to_string()];
216
217 let indices = column_index_map(&df);
218 let errors = not_null_errors(&df, &required, &indices).expect("not_null_errors");
219
220 assert!(errors[0].is_empty());
221 assert_eq!(
222 errors[1],
223 vec![RowError::new(
224 "not_null",
225 "customer_id",
226 "required value missing"
227 )]
228 );
229 assert!(errors[2].is_empty());
230 }
231
232 #[test]
233 fn cast_mismatch_errors_flags_invalid_casts() {
234 let raw_df = df!(
235 "created_at" => &["2024-01-01", "bad-date"]
236 )
237 .expect("raw df");
238 let typed_series = Series::new("created_at".into(), &[Some(1i64), None]);
239 let typed_df = DataFrame::new(vec![typed_series.into()]).expect("typed df");
240
241 let columns = vec![config::ColumnConfig {
242 name: "created_at".to_string(),
243 column_type: "datetime".to_string(),
244 nullable: Some(true),
245 unique: None,
246 }];
247
248 let raw_indices = column_index_map(&raw_df);
249 let typed_indices = column_index_map(&typed_df);
250 let errors =
251 cast_mismatch_errors(&raw_df, &typed_df, &columns, &raw_indices, &typed_indices)
252 .expect("cast errors");
253
254 assert!(errors[0].is_empty());
255 assert_eq!(
256 errors[1],
257 vec![RowError::new(
258 "cast_error",
259 "created_at",
260 "invalid value for target type"
261 )]
262 );
263 }
264
265 #[test]
266 fn unique_errors_flags_duplicates_after_first() {
267 let df = df!(
268 "order_id" => &[Some("o-1"), Some("o-2"), Some("o-1"), None, Some("o-2")]
269 )
270 .expect("create df");
271
272 let columns = vec![config::ColumnConfig {
273 name: "order_id".to_string(),
274 column_type: "string".to_string(),
275 nullable: Some(true),
276 unique: Some(true),
277 }];
278
279 let indices = column_index_map(&df);
280 let errors = unique_errors(&df, &columns, &indices).expect("unique errors");
281
282 assert!(errors[0].is_empty());
283 assert!(errors[1].is_empty());
284 assert_eq!(
285 errors[2],
286 vec![RowError::new("unique", "order_id", "duplicate value")]
287 );
288 assert!(errors[3].is_empty());
289 assert_eq!(
290 errors[4],
291 vec![RowError::new("unique", "order_id", "duplicate value")]
292 );
293 }
294
295 #[test]
296 fn build_error_state_builds_masks() {
297 let errors = vec![
298 vec![],
299 vec![RowError::new(
300 "not_null",
301 "customer_id",
302 "required value missing",
303 )],
304 ];
305 let accept_rows = build_accept_rows(&errors);
306 let errors_json = build_errors_json(&errors, &accept_rows);
307
308 assert_eq!(accept_rows, vec![true, false]);
309 assert!(errors_json[0].is_none());
310 assert!(errors_json[1].as_ref().unwrap().contains("not_null"));
311 }
312
313 #[test]
314 fn csv_formatter_outputs_json_string() {
315 let errors = vec![
316 RowError::new("not_null", "customer_id", "missing"),
317 RowError::new("unique", "order_id", "duplicate"),
318 ];
319 let formatted = CsvRowErrorFormatter.format(&errors);
320 assert_eq!(
321 formatted,
322 "\"not_null,customer_id,missing\\nunique,order_id,duplicate\""
323 );
324 }
325
326 #[test]
327 fn text_formatter_outputs_json_string() {
328 let errors = vec![RowError::new("unique", "order_id", "duplicate")];
329 let formatted = TextRowErrorFormatter.format(&errors);
330 assert_eq!(formatted, "\"unique:order_id duplicate\"");
331 }
332}