1mod cast;
2mod mismatch;
3pub mod normalize;
4mod not_null;
5mod unique;
6
7use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
8use std::collections::{BTreeMap, HashMap};
9
10use crate::{ConfigError, FloeResult};
11
12pub use cast::{cast_mismatch_counts, cast_mismatch_errors, cast_mismatch_errors_sparse};
13pub use mismatch::{
14 apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, resolve_mismatch_columns,
15 top_level_declared_columns, MismatchOutcome,
16};
17pub use not_null::{not_null_counts, not_null_errors, not_null_errors_sparse};
18pub use unique::{
19 resolve_schema_unique_keys, unique_counts, unique_errors, unique_errors_sparse,
20 UniqueConstraint, UniqueConstraintResult, UniqueTracker,
21};
22
23pub type ColumnIndex = HashMap<String, usize>;
24
25pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
26 df.get_column_names()
27 .iter()
28 .enumerate()
29 .map(|(idx, name)| (name.to_string(), idx))
30 .collect()
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct RowError {
35 pub rule: String,
36 pub column: String,
37 pub message: String,
38}
39
40#[derive(Debug, Clone, Default)]
41pub struct SparseRowErrors {
42 row_count: usize,
43 rows: BTreeMap<usize, Vec<RowError>>,
44}
45
46impl SparseRowErrors {
47 pub fn new(row_count: usize) -> Self {
48 Self {
49 row_count,
50 rows: BTreeMap::new(),
51 }
52 }
53
54 pub fn is_empty(&self) -> bool {
55 self.rows.is_empty()
56 }
57
58 pub fn add_error(&mut self, row_idx: usize, error: RowError) {
59 self.rows.entry(row_idx).or_default().push(error);
60 }
61
62 pub fn add_errors(&mut self, row_idx: usize, errors: Vec<RowError>) {
63 if errors.is_empty() {
64 return;
65 }
66 self.rows.entry(row_idx).or_default().extend(errors);
67 }
68
69 pub fn merge(&mut self, other: SparseRowErrors) {
70 for (row_idx, errors) in other.rows {
71 self.add_errors(row_idx, errors);
72 }
73 }
74
75 pub fn accept_rows(&self) -> Vec<bool> {
76 let mut accept_rows = vec![true; self.row_count];
77 for row_idx in self.rows.keys() {
78 if let Some(slot) = accept_rows.get_mut(*row_idx) {
79 *slot = false;
80 }
81 }
82 accept_rows
83 }
84
85 pub fn build_errors_formatted(&self, formatter: &dyn RowErrorFormatter) -> Vec<Option<String>> {
86 let mut errors_out = vec![None; self.row_count];
87 for (row_idx, errors) in &self.rows {
88 if let Some(slot) = errors_out.get_mut(*row_idx) {
89 *slot = Some(formatter.format(errors));
90 }
91 }
92 errors_out
93 }
94
95 pub fn iter(&self) -> impl Iterator<Item = (&usize, &Vec<RowError>)> {
96 self.rows.iter()
97 }
98
99 pub fn error_row_count(&self) -> u64 {
100 self.rows.len() as u64
101 }
102
103 pub fn violation_count(&self) -> u64 {
104 self.rows.values().map(|errors| errors.len() as u64).sum()
105 }
106}
107
108impl RowError {
109 pub fn new(rule: &str, column: &str, message: &str) -> Self {
110 Self {
111 rule: rule.to_string(),
112 column: column.to_string(),
113 message: message.to_string(),
114 }
115 }
116
117 pub fn to_json(&self) -> String {
118 self.to_json_with_source(None)
119 }
120
121 pub fn to_json_with_source(&self, source: Option<&str>) -> String {
122 match source {
123 Some(source) => format!(
124 "{{\"rule\":\"{}\",\"column\":\"{}\",\"source\":\"{}\",\"message\":\"{}\"}}",
125 json_escape(&self.rule),
126 json_escape(&self.column),
127 json_escape(source),
128 json_escape(&self.message)
129 ),
130 None => format!(
131 "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
132 json_escape(&self.rule),
133 json_escape(&self.column),
134 json_escape(&self.message)
135 ),
136 }
137 }
138}
139
140pub trait RowErrorFormatter {
141 fn format(&self, errors: &[RowError]) -> String;
142}
143
144#[derive(Default)]
145pub struct JsonRowErrorFormatter {
146 source_map: Option<HashMap<String, String>>,
147}
148#[derive(Default)]
149pub struct CsvRowErrorFormatter {
150 source_map: Option<HashMap<String, String>>,
151}
152#[derive(Default)]
153pub struct TextRowErrorFormatter {
154 source_map: Option<HashMap<String, String>>,
155}
156
157impl RowErrorFormatter for JsonRowErrorFormatter {
158 fn format(&self, errors: &[RowError]) -> String {
159 let json_items = errors
160 .iter()
161 .map(|error| {
162 let source = self
163 .source_map
164 .as_ref()
165 .and_then(|map| map.get(&error.column).map(|value| value.as_str()));
166 error.to_json_with_source(source)
167 })
168 .collect::<Vec<_>>()
169 .join(",");
170 format!("[{}]", json_items)
171 }
172}
173
174impl RowErrorFormatter for CsvRowErrorFormatter {
175 fn format(&self, errors: &[RowError]) -> String {
176 let lines = errors
177 .iter()
178 .map(|error| {
179 if let Some(source_map) = self.source_map.as_ref() {
180 let source = source_map
181 .get(&error.column)
182 .map(|value| value.as_str())
183 .unwrap_or("");
184 format!(
185 "{},{},{},{}",
186 csv_escape(&error.rule),
187 csv_escape(&error.column),
188 csv_escape(source),
189 csv_escape(&error.message)
190 )
191 } else {
192 format!(
193 "{},{},{}",
194 csv_escape(&error.rule),
195 csv_escape(&error.column),
196 csv_escape(&error.message)
197 )
198 }
199 })
200 .collect::<Vec<_>>()
201 .join("\n");
202 json_string(&lines)
203 }
204}
205
206impl RowErrorFormatter for TextRowErrorFormatter {
207 fn format(&self, errors: &[RowError]) -> String {
208 let text = errors
209 .iter()
210 .map(|error| {
211 if let Some(source_map) = self.source_map.as_ref() {
212 if let Some(source) = source_map.get(&error.column) {
213 return format!(
214 "{}:{} source={} {}",
215 error.rule, error.column, source, error.message
216 );
217 }
218 }
219 format!("{}:{} {}", error.rule, error.column, error.message)
220 })
221 .collect::<Vec<_>>()
222 .join("; ");
223 json_string(&text)
224 }
225}
226
227pub fn row_error_formatter(
228 name: &str,
229 source_map: Option<&HashMap<String, String>>,
230) -> FloeResult<Box<dyn RowErrorFormatter>> {
231 match name {
232 "json" => Ok(Box::new(JsonRowErrorFormatter {
233 source_map: source_map.cloned(),
234 })),
235 "csv" => Ok(Box::new(CsvRowErrorFormatter {
236 source_map: source_map.cloned(),
237 })),
238 "text" => Ok(Box::new(TextRowErrorFormatter {
239 source_map: source_map.cloned(),
240 })),
241 other => Err(Box::new(ConfigError(format!(
242 "unsupported report.formatter: {other}"
243 )))),
244 }
245}
246
247pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
248 let mut accept_rows = Vec::with_capacity(errors_per_row.len());
249 for errors in errors_per_row {
250 accept_rows.push(errors.is_empty());
251 }
252 accept_rows
253}
254
255pub fn build_errors_json(
256 errors_per_row: &[Vec<RowError>],
257 accept_rows: &[bool],
258) -> Vec<Option<String>> {
259 let formatter = JsonRowErrorFormatter { source_map: None };
260 build_errors_formatted(errors_per_row, accept_rows, &formatter)
261}
262
263pub fn build_errors_formatted(
264 errors_per_row: &[Vec<RowError>],
265 accept_rows: &[bool],
266 formatter: &dyn RowErrorFormatter,
267) -> Vec<Option<String>> {
268 let mut errors_out = Vec::with_capacity(errors_per_row.len());
269 for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
270 if *accepted {
271 errors_out.push(None);
272 continue;
273 }
274 errors_out.push(Some(formatter.format(errors)));
275 }
276 errors_out
277}
278
279pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
280 let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
281 let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
282 let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
283 (accept_mask, reject_mask)
284}
285
286pub fn rejected_error_columns(
287 errors_per_row: &[Option<String>],
288 include_all_rows: bool,
289) -> (Series, Series) {
290 if include_all_rows {
291 let mut row_index = Vec::with_capacity(errors_per_row.len());
292 let mut errors = Vec::with_capacity(errors_per_row.len());
293 for (idx, err) in errors_per_row.iter().enumerate() {
294 row_index.push(idx as u64);
295 errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
296 }
297 (
298 Series::new("__floe_row_index".into(), row_index),
299 Series::new("__floe_errors".into(), errors),
300 )
301 } else {
302 let mut row_index = Vec::new();
303 let mut errors = Vec::new();
304 for (idx, err) in errors_per_row.iter().enumerate() {
305 if let Some(err) = err {
306 row_index.push(idx as u64);
307 errors.push(err.clone());
308 }
309 }
310 (
311 Series::new("__floe_row_index".into(), row_index),
312 Series::new("__floe_errors".into(), errors),
313 )
314 }
315}
316
317fn json_escape(value: &str) -> String {
318 value
319 .replace('\\', "\\\\")
320 .replace('\"', "\\\"")
321 .replace('\n', "\\n")
322 .replace('\r', "\\r")
323 .replace('\t', "\\t")
324}
325
326fn json_string(value: &str) -> String {
327 format!("\"{}\"", json_escape(value))
328}
329
330fn csv_escape(value: &str) -> String {
331 let escaped = value.replace('"', "\"\"");
332 if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
333 format!("\"{}\"", escaped)
334 } else {
335 escaped
336 }
337}