1mod cast;
2mod mismatch;
3pub mod normalize;
4mod not_null;
5mod unique;
6
7use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
8use std::collections::{BTreeMap, HashMap};
9
10use crate::{ConfigError, FloeResult};
11
12pub use cast::{cast_mismatch_counts, cast_mismatch_errors, cast_mismatch_errors_sparse};
13pub use mismatch::{
14 apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, resolve_mismatch_columns,
15 top_level_declared_columns, MismatchOutcome,
16};
17pub use not_null::{not_null_counts, not_null_errors, not_null_errors_sparse};
18pub use unique::{unique_counts, unique_errors, unique_errors_sparse, UniqueTracker};
19
20pub type ColumnIndex = HashMap<String, usize>;
21
22pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
23 df.get_column_names()
24 .iter()
25 .enumerate()
26 .map(|(idx, name)| (name.to_string(), idx))
27 .collect()
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct RowError {
32 pub rule: String,
33 pub column: String,
34 pub message: String,
35}
36
37#[derive(Debug, Clone, Default)]
38pub struct SparseRowErrors {
39 row_count: usize,
40 rows: BTreeMap<usize, Vec<RowError>>,
41}
42
43impl SparseRowErrors {
44 pub fn new(row_count: usize) -> Self {
45 Self {
46 row_count,
47 rows: BTreeMap::new(),
48 }
49 }
50
51 pub fn is_empty(&self) -> bool {
52 self.rows.is_empty()
53 }
54
55 pub fn add_error(&mut self, row_idx: usize, error: RowError) {
56 self.rows.entry(row_idx).or_default().push(error);
57 }
58
59 pub fn add_errors(&mut self, row_idx: usize, errors: Vec<RowError>) {
60 if errors.is_empty() {
61 return;
62 }
63 self.rows.entry(row_idx).or_default().extend(errors);
64 }
65
66 pub fn merge(&mut self, other: SparseRowErrors) {
67 for (row_idx, errors) in other.rows {
68 self.add_errors(row_idx, errors);
69 }
70 }
71
72 pub fn accept_rows(&self) -> Vec<bool> {
73 let mut accept_rows = vec![true; self.row_count];
74 for row_idx in self.rows.keys() {
75 if let Some(slot) = accept_rows.get_mut(*row_idx) {
76 *slot = false;
77 }
78 }
79 accept_rows
80 }
81
82 pub fn build_errors_formatted(&self, formatter: &dyn RowErrorFormatter) -> Vec<Option<String>> {
83 let mut errors_out = vec![None; self.row_count];
84 for (row_idx, errors) in &self.rows {
85 if let Some(slot) = errors_out.get_mut(*row_idx) {
86 *slot = Some(formatter.format(errors));
87 }
88 }
89 errors_out
90 }
91
92 pub fn iter(&self) -> impl Iterator<Item = (&usize, &Vec<RowError>)> {
93 self.rows.iter()
94 }
95
96 pub fn error_row_count(&self) -> u64 {
97 self.rows.len() as u64
98 }
99
100 pub fn violation_count(&self) -> u64 {
101 self.rows.values().map(|errors| errors.len() as u64).sum()
102 }
103}
104
105impl RowError {
106 pub fn new(rule: &str, column: &str, message: &str) -> Self {
107 Self {
108 rule: rule.to_string(),
109 column: column.to_string(),
110 message: message.to_string(),
111 }
112 }
113
114 pub fn to_json(&self) -> String {
115 self.to_json_with_source(None)
116 }
117
118 pub fn to_json_with_source(&self, source: Option<&str>) -> String {
119 match source {
120 Some(source) => format!(
121 "{{\"rule\":\"{}\",\"column\":\"{}\",\"source\":\"{}\",\"message\":\"{}\"}}",
122 json_escape(&self.rule),
123 json_escape(&self.column),
124 json_escape(source),
125 json_escape(&self.message)
126 ),
127 None => format!(
128 "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
129 json_escape(&self.rule),
130 json_escape(&self.column),
131 json_escape(&self.message)
132 ),
133 }
134 }
135}
136
137pub trait RowErrorFormatter {
138 fn format(&self, errors: &[RowError]) -> String;
139}
140
141#[derive(Default)]
142pub struct JsonRowErrorFormatter {
143 source_map: Option<HashMap<String, String>>,
144}
145#[derive(Default)]
146pub struct CsvRowErrorFormatter {
147 source_map: Option<HashMap<String, String>>,
148}
149#[derive(Default)]
150pub struct TextRowErrorFormatter {
151 source_map: Option<HashMap<String, String>>,
152}
153
154impl RowErrorFormatter for JsonRowErrorFormatter {
155 fn format(&self, errors: &[RowError]) -> String {
156 let json_items = errors
157 .iter()
158 .map(|error| {
159 let source = self
160 .source_map
161 .as_ref()
162 .and_then(|map| map.get(&error.column).map(|value| value.as_str()));
163 error.to_json_with_source(source)
164 })
165 .collect::<Vec<_>>()
166 .join(",");
167 format!("[{}]", json_items)
168 }
169}
170
171impl RowErrorFormatter for CsvRowErrorFormatter {
172 fn format(&self, errors: &[RowError]) -> String {
173 let lines = errors
174 .iter()
175 .map(|error| {
176 if let Some(source_map) = self.source_map.as_ref() {
177 let source = source_map
178 .get(&error.column)
179 .map(|value| value.as_str())
180 .unwrap_or("");
181 format!(
182 "{},{},{},{}",
183 csv_escape(&error.rule),
184 csv_escape(&error.column),
185 csv_escape(source),
186 csv_escape(&error.message)
187 )
188 } else {
189 format!(
190 "{},{},{}",
191 csv_escape(&error.rule),
192 csv_escape(&error.column),
193 csv_escape(&error.message)
194 )
195 }
196 })
197 .collect::<Vec<_>>()
198 .join("\n");
199 json_string(&lines)
200 }
201}
202
203impl RowErrorFormatter for TextRowErrorFormatter {
204 fn format(&self, errors: &[RowError]) -> String {
205 let text = errors
206 .iter()
207 .map(|error| {
208 if let Some(source_map) = self.source_map.as_ref() {
209 if let Some(source) = source_map.get(&error.column) {
210 return format!(
211 "{}:{} source={} {}",
212 error.rule, error.column, source, error.message
213 );
214 }
215 }
216 format!("{}:{} {}", error.rule, error.column, error.message)
217 })
218 .collect::<Vec<_>>()
219 .join("; ");
220 json_string(&text)
221 }
222}
223
224pub fn row_error_formatter(
225 name: &str,
226 source_map: Option<&HashMap<String, String>>,
227) -> FloeResult<Box<dyn RowErrorFormatter>> {
228 match name {
229 "json" => Ok(Box::new(JsonRowErrorFormatter {
230 source_map: source_map.cloned(),
231 })),
232 "csv" => Ok(Box::new(CsvRowErrorFormatter {
233 source_map: source_map.cloned(),
234 })),
235 "text" => Ok(Box::new(TextRowErrorFormatter {
236 source_map: source_map.cloned(),
237 })),
238 other => Err(Box::new(ConfigError(format!(
239 "unsupported report.formatter: {other}"
240 )))),
241 }
242}
243
244pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
245 let mut accept_rows = Vec::with_capacity(errors_per_row.len());
246 for errors in errors_per_row {
247 accept_rows.push(errors.is_empty());
248 }
249 accept_rows
250}
251
252pub fn build_errors_json(
253 errors_per_row: &[Vec<RowError>],
254 accept_rows: &[bool],
255) -> Vec<Option<String>> {
256 let formatter = JsonRowErrorFormatter { source_map: None };
257 build_errors_formatted(errors_per_row, accept_rows, &formatter)
258}
259
260pub fn build_errors_formatted(
261 errors_per_row: &[Vec<RowError>],
262 accept_rows: &[bool],
263 formatter: &dyn RowErrorFormatter,
264) -> Vec<Option<String>> {
265 let mut errors_out = Vec::with_capacity(errors_per_row.len());
266 for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
267 if *accepted {
268 errors_out.push(None);
269 continue;
270 }
271 errors_out.push(Some(formatter.format(errors)));
272 }
273 errors_out
274}
275
276pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
277 let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
278 let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
279 let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
280 (accept_mask, reject_mask)
281}
282
283pub fn rejected_error_columns(
284 errors_per_row: &[Option<String>],
285 include_all_rows: bool,
286) -> (Series, Series) {
287 if include_all_rows {
288 let mut row_index = Vec::with_capacity(errors_per_row.len());
289 let mut errors = Vec::with_capacity(errors_per_row.len());
290 for (idx, err) in errors_per_row.iter().enumerate() {
291 row_index.push(idx as u64);
292 errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
293 }
294 (
295 Series::new("__floe_row_index".into(), row_index),
296 Series::new("__floe_errors".into(), errors),
297 )
298 } else {
299 let mut row_index = Vec::new();
300 let mut errors = Vec::new();
301 for (idx, err) in errors_per_row.iter().enumerate() {
302 if let Some(err) = err {
303 row_index.push(idx as u64);
304 errors.push(err.clone());
305 }
306 }
307 (
308 Series::new("__floe_row_index".into(), row_index),
309 Series::new("__floe_errors".into(), errors),
310 )
311 }
312}
313
314fn json_escape(value: &str) -> String {
315 value
316 .replace('\\', "\\\\")
317 .replace('\"', "\\\"")
318 .replace('\n', "\\n")
319 .replace('\r', "\\r")
320 .replace('\t', "\\t")
321}
322
323fn json_string(value: &str) -> String {
324 format!("\"{}\"", json_escape(value))
325}
326
327fn csv_escape(value: &str) -> String {
328 let escaped = value.replace('"', "\"\"");
329 if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
330 format!("\"{}\"", escaped)
331 } else {
332 escaped
333 }
334}