1mod cast;
2mod mismatch;
3pub mod normalize;
4mod not_null;
5mod unique;
6
7use polars::prelude::{BooleanChunked, DataFrame, NamedFrom, NewChunkedArray, Series};
8use std::collections::{BTreeMap, HashMap};
9
10use crate::{ConfigError, FloeResult};
11
12pub use cast::{cast_mismatch_counts, cast_mismatch_errors, cast_mismatch_errors_sparse};
13pub use mismatch::{
14 apply_mismatch_plan, apply_schema_mismatch, plan_schema_mismatch, MismatchOutcome,
15};
16pub use not_null::{not_null_counts, not_null_errors, not_null_errors_sparse};
17pub use unique::{unique_counts, unique_errors, unique_errors_sparse, UniqueTracker};
18
19pub type ColumnIndex = HashMap<String, usize>;
20
21pub fn column_index_map(df: &DataFrame) -> ColumnIndex {
22 df.get_column_names()
23 .iter()
24 .enumerate()
25 .map(|(idx, name)| (name.to_string(), idx))
26 .collect()
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct RowError {
31 pub rule: String,
32 pub column: String,
33 pub message: String,
34}
35
36#[derive(Debug, Clone, Default)]
37pub struct SparseRowErrors {
38 row_count: usize,
39 rows: BTreeMap<usize, Vec<RowError>>,
40}
41
42impl SparseRowErrors {
43 pub fn new(row_count: usize) -> Self {
44 Self {
45 row_count,
46 rows: BTreeMap::new(),
47 }
48 }
49
50 pub fn is_empty(&self) -> bool {
51 self.rows.is_empty()
52 }
53
54 pub fn add_error(&mut self, row_idx: usize, error: RowError) {
55 self.rows.entry(row_idx).or_default().push(error);
56 }
57
58 pub fn add_errors(&mut self, row_idx: usize, errors: Vec<RowError>) {
59 if errors.is_empty() {
60 return;
61 }
62 self.rows.entry(row_idx).or_default().extend(errors);
63 }
64
65 pub fn merge(&mut self, other: SparseRowErrors) {
66 for (row_idx, errors) in other.rows {
67 self.add_errors(row_idx, errors);
68 }
69 }
70
71 pub fn accept_rows(&self) -> Vec<bool> {
72 let mut accept_rows = vec![true; self.row_count];
73 for row_idx in self.rows.keys() {
74 if let Some(slot) = accept_rows.get_mut(*row_idx) {
75 *slot = false;
76 }
77 }
78 accept_rows
79 }
80
81 pub fn build_errors_formatted(&self, formatter: &dyn RowErrorFormatter) -> Vec<Option<String>> {
82 let mut errors_out = vec![None; self.row_count];
83 for (row_idx, errors) in &self.rows {
84 if let Some(slot) = errors_out.get_mut(*row_idx) {
85 *slot = Some(formatter.format(errors));
86 }
87 }
88 errors_out
89 }
90
91 pub fn iter(&self) -> impl Iterator<Item = (&usize, &Vec<RowError>)> {
92 self.rows.iter()
93 }
94
95 pub fn error_row_count(&self) -> u64 {
96 self.rows.len() as u64
97 }
98
99 pub fn violation_count(&self) -> u64 {
100 self.rows.values().map(|errors| errors.len() as u64).sum()
101 }
102}
103
104impl RowError {
105 pub fn new(rule: &str, column: &str, message: &str) -> Self {
106 Self {
107 rule: rule.to_string(),
108 column: column.to_string(),
109 message: message.to_string(),
110 }
111 }
112
113 pub fn to_json(&self) -> String {
114 format!(
115 "{{\"rule\":\"{}\",\"column\":\"{}\",\"message\":\"{}\"}}",
116 json_escape(&self.rule),
117 json_escape(&self.column),
118 json_escape(&self.message)
119 )
120 }
121}
122
123pub trait RowErrorFormatter {
124 fn format(&self, errors: &[RowError]) -> String;
125}
126
127pub struct JsonRowErrorFormatter;
128pub struct CsvRowErrorFormatter;
129pub struct TextRowErrorFormatter;
130
131impl RowErrorFormatter for JsonRowErrorFormatter {
132 fn format(&self, errors: &[RowError]) -> String {
133 let json_items = errors
134 .iter()
135 .map(RowError::to_json)
136 .collect::<Vec<_>>()
137 .join(",");
138 format!("[{}]", json_items)
139 }
140}
141
142impl RowErrorFormatter for CsvRowErrorFormatter {
143 fn format(&self, errors: &[RowError]) -> String {
144 let lines = errors
145 .iter()
146 .map(|error| {
147 format!(
148 "{},{},{}",
149 csv_escape(&error.rule),
150 csv_escape(&error.column),
151 csv_escape(&error.message)
152 )
153 })
154 .collect::<Vec<_>>()
155 .join("\n");
156 json_string(&lines)
157 }
158}
159
160impl RowErrorFormatter for TextRowErrorFormatter {
161 fn format(&self, errors: &[RowError]) -> String {
162 let text = errors
163 .iter()
164 .map(|error| format!("{}:{} {}", error.rule, error.column, error.message))
165 .collect::<Vec<_>>()
166 .join("; ");
167 json_string(&text)
168 }
169}
170
171pub fn row_error_formatter(name: &str) -> FloeResult<Box<dyn RowErrorFormatter>> {
172 match name {
173 "json" => Ok(Box::new(JsonRowErrorFormatter)),
174 "csv" => Ok(Box::new(CsvRowErrorFormatter)),
175 "text" => Ok(Box::new(TextRowErrorFormatter)),
176 other => Err(Box::new(ConfigError(format!(
177 "unsupported report.formatter: {other}"
178 )))),
179 }
180}
181
182pub fn build_accept_rows(errors_per_row: &[Vec<RowError>]) -> Vec<bool> {
183 let mut accept_rows = Vec::with_capacity(errors_per_row.len());
184 for errors in errors_per_row {
185 accept_rows.push(errors.is_empty());
186 }
187 accept_rows
188}
189
190pub fn build_errors_json(
191 errors_per_row: &[Vec<RowError>],
192 accept_rows: &[bool],
193) -> Vec<Option<String>> {
194 build_errors_formatted(errors_per_row, accept_rows, &JsonRowErrorFormatter)
195}
196
197pub fn build_errors_formatted(
198 errors_per_row: &[Vec<RowError>],
199 accept_rows: &[bool],
200 formatter: &dyn RowErrorFormatter,
201) -> Vec<Option<String>> {
202 let mut errors_out = Vec::with_capacity(errors_per_row.len());
203 for (errors, accepted) in errors_per_row.iter().zip(accept_rows.iter()) {
204 if *accepted {
205 errors_out.push(None);
206 continue;
207 }
208 errors_out.push(Some(formatter.format(errors)));
209 }
210 errors_out
211}
212
213pub fn build_row_masks(accept_rows: &[bool]) -> (BooleanChunked, BooleanChunked) {
214 let reject_rows: Vec<bool> = accept_rows.iter().map(|accepted| !*accepted).collect();
215 let accept_mask = BooleanChunked::from_slice("floe_accept".into(), accept_rows);
216 let reject_mask = BooleanChunked::from_slice("floe_reject".into(), &reject_rows);
217 (accept_mask, reject_mask)
218}
219
220pub fn rejected_error_columns(
221 errors_per_row: &[Option<String>],
222 include_all_rows: bool,
223) -> (Series, Series) {
224 if include_all_rows {
225 let mut row_index = Vec::with_capacity(errors_per_row.len());
226 let mut errors = Vec::with_capacity(errors_per_row.len());
227 for (idx, err) in errors_per_row.iter().enumerate() {
228 row_index.push(idx as u64);
229 errors.push(err.clone().unwrap_or_else(|| "[]".to_string()));
230 }
231 (
232 Series::new("__floe_row_index".into(), row_index),
233 Series::new("__floe_errors".into(), errors),
234 )
235 } else {
236 let mut row_index = Vec::new();
237 let mut errors = Vec::new();
238 for (idx, err) in errors_per_row.iter().enumerate() {
239 if let Some(err) = err {
240 row_index.push(idx as u64);
241 errors.push(err.clone());
242 }
243 }
244 (
245 Series::new("__floe_row_index".into(), row_index),
246 Series::new("__floe_errors".into(), errors),
247 )
248 }
249}
250
251fn json_escape(value: &str) -> String {
252 value
253 .replace('\\', "\\\\")
254 .replace('\"', "\\\"")
255 .replace('\n', "\\n")
256 .replace('\r', "\\r")
257 .replace('\t', "\\t")
258}
259
260fn json_string(value: &str) -> String {
261 format!("\"{}\"", json_escape(value))
262}
263
264fn csv_escape(value: &str) -> String {
265 let escaped = value.replace('"', "\"\"");
266 if escaped.contains(',') || escaped.contains('\n') || escaped.contains('\r') {
267 format!("\"{}\"", escaped)
268 } else {
269 escaped
270 }
271}