1use std::{collections::HashMap, path::Path};
2
3use anyhow::{Context, Result, anyhow};
4use log::info;
5
6use crate::{
7 cli::SchemaVerifyArgs,
8 data::parse_typed_value,
9 io_utils,
10 schema::{ColumnType, Schema},
11 table,
12};
13
14pub fn execute(args: &SchemaVerifyArgs) -> Result<()> {
15 let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
16 let schema = Schema::load(&args.schema)
17 .with_context(|| format!("Loading schema from {schema:?}", schema = args.schema))?;
18 let report_config = args
19 .report_invalid
20 .as_ref()
21 .map(|values| parse_report_invalid_options(values))
22 .transpose()?;
23 for input in &args.inputs {
24 let delimiter = io_utils::resolve_input_delimiter(input, args.delimiter);
25 validate_file_against_schema(&schema, input, delimiter, input_encoding, report_config)?;
26 info!("✓ {input:?} matches schema");
27 }
28 Ok(())
29}
30
31#[derive(Debug, Clone)]
32struct InvalidEntry {
33 row_number: usize,
34 column_name: String,
35 datatype: ColumnType,
36 raw_value: String,
37 normalized_value: Option<String>,
38 reason: String,
39}
40
41#[derive(Debug, Clone, Copy)]
42struct InvalidReportOptions {
43 show_detail: bool,
44 show_summary: bool,
45 limit: Option<usize>,
46}
47
48#[derive(Debug, Clone)]
49struct ColumnSummary {
50 datatype: ColumnType,
51 count: usize,
52}
53
54fn parse_report_invalid_options(values: &[String]) -> Result<InvalidReportOptions> {
55 let mut show_detail = false;
56 let mut show_summary = false;
57 let mut limit = None;
58
59 for token in values {
60 if token.is_empty() {
61 continue;
62 }
63 let lowered = token.to_ascii_lowercase();
64 match lowered.as_str() {
65 "detail" => {
66 show_detail = true;
67 }
68 "summary" => {
69 show_summary = true;
70 }
71 _ => {
72 if let Ok(parsed) = token.parse::<usize>() {
73 limit = Some(parsed);
74 if !show_detail && !show_summary {
75 show_detail = true;
76 }
77 } else {
78 return Err(anyhow!(
79 "Invalid value '{token}' for --report-invalid; expected 'detail', 'summary', or a positive integer limit"
80 ));
81 }
82 }
83 }
84 }
85
86 if !show_detail && !show_summary {
87 show_summary = true;
88 }
89
90 Ok(InvalidReportOptions {
91 show_detail,
92 show_summary,
93 limit,
94 })
95}
96
97fn validate_file_against_schema(
98 schema: &Schema,
99 path: &Path,
100 delimiter: u8,
101 encoding: &'static encoding_rs::Encoding,
102 report: Option<InvalidReportOptions>,
103) -> Result<()> {
104 let mut reader =
105 io_utils::open_csv_reader_from_path(path, delimiter, schema.expects_headers())?;
106 if schema.expects_headers() {
107 let headers = io_utils::reader_headers(&mut reader, encoding)?;
108 schema
109 .validate_headers(&headers)
110 .map_err(|err| anyhow!("Validating headers for {path:?}: {err}"))?;
111 }
112
113 let report_cfg = report;
114 let detail_enabled = report_cfg.is_some_and(|cfg| cfg.show_detail);
115 let summary_enabled = report_cfg.is_some_and(|cfg| cfg.show_summary);
116 let report_enabled = detail_enabled || summary_enabled;
117 let collection_limit = if detail_enabled {
118 report_cfg.and_then(|cfg| cfg.limit).unwrap_or(usize::MAX)
119 } else {
120 0
121 };
122
123 let mut invalid_entries = Vec::new();
124 let mut column_summary: HashMap<String, ColumnSummary> = HashMap::new();
125 let mut total_errors = 0usize;
126
127 for (row_idx, record) in reader.byte_records().enumerate() {
128 let record = record.with_context(|| format!("Reading row {} in {path:?}", row_idx + 2))?;
129 let decoded = io_utils::decode_record(&record, encoding)?;
130 let mut transformed = decoded.clone();
131 if schema.has_transformations() {
132 schema
133 .apply_transformations_to_row(&mut transformed)
134 .with_context(|| {
135 format!(
136 "Applying datatype mappings to row {} in {path:?}",
137 row_idx + 2
138 )
139 })?;
140 }
141 schema.apply_replacements_to_row(&mut transformed);
142 for (col_idx, column) in schema.columns.iter().enumerate() {
143 let raw_value = decoded.get(col_idx).map(|s| s.as_str()).unwrap_or("");
144 let normalized_value = transformed.get(col_idx).map(|s| s.as_str()).unwrap_or("");
145 if let Err(err) = validate_value(normalized_value, &column.datatype) {
146 if !report_enabled {
147 let message = if normalized_value == raw_value {
148 format!(
149 "Row {} column '{}': value {:?}\nReason: {}",
150 row_idx + 2,
151 column.output_name(),
152 raw_value,
153 err
154 )
155 } else {
156 format!(
157 "Row {} column '{}': value {:?} (normalized {:?})\nReason: {}",
158 row_idx + 2,
159 column.output_name(),
160 raw_value,
161 normalized_value,
162 err
163 )
164 };
165 return Err(anyhow!(message));
166 }
167
168 let normalized_owned = normalized_value.to_string();
169 let normalized_changed = normalized_owned != raw_value;
170
171 total_errors += 1;
172 if detail_enabled && invalid_entries.len() < collection_limit {
173 let normalized_value = if normalized_changed {
174 Some(normalized_owned.clone())
175 } else {
176 None
177 };
178 invalid_entries.push(InvalidEntry {
179 row_number: row_idx + 2,
180 column_name: column.output_name().to_string(),
181 datatype: column.datatype.clone(),
182 raw_value: raw_value.to_string(),
183 normalized_value,
184 reason: err.to_string(),
185 });
186 }
187
188 column_summary
189 .entry(column.output_name().to_string())
190 .and_modify(|summary| summary.count += 1)
191 .or_insert_with(|| ColumnSummary {
192 datatype: column.datatype.clone(),
193 count: 1,
194 });
195 }
196 }
197 }
198
199 if report_enabled && total_errors > 0 {
200 if let Some(cfg) = report_cfg {
201 print_invalid_report(path, &invalid_entries, &column_summary, total_errors, cfg);
202 }
203 return Err(anyhow!(format!(
204 "Found {total_errors} invalid value(s) in {path:?}"
205 )));
206 }
207
208 Ok(())
209}
210
211fn validate_value(value: &str, column_type: &ColumnType) -> Result<()> {
212 if value.is_empty() {
213 return Ok(());
214 }
215 parse_typed_value(value, column_type).map(|_| ())
216}
217
218fn print_invalid_report(
219 path: &Path,
220 entries: &[InvalidEntry],
221 column_summary: &HashMap<String, ColumnSummary>,
222 total_errors: usize,
223 options: InvalidReportOptions,
224) {
225 println!();
226
227 if options.show_detail {
228 println!("Invalid rows in {}:", path.display());
229
230 let displayed = entries.len();
231
232 if entries.is_empty() {
233 println!(
234 "No sample rows captured; use '--report-invalid:detail <LIMIT>' with a higher LIMIT to display examples."
235 );
236 } else {
237 let headers = vec![
238 "row".to_string(),
239 "column".to_string(),
240 "raw".to_string(),
241 "value".to_string(),
242 "datatype".to_string(),
243 "reason".to_string(),
244 ];
245
246 let rows = entries
247 .iter()
248 .map(|entry| {
249 let highlight_target =
250 entry.normalized_value.as_ref().unwrap_or(&entry.raw_value);
251 let highlighted = highlight_red(highlight_target);
252
253 vec![
254 entry.row_number.to_string(),
255 entry.column_name.clone(),
256 entry.raw_value.clone(),
257 highlighted,
258 entry.datatype.to_string(),
259 entry.reason.clone(),
260 ]
261 })
262 .collect::<Vec<_>>();
263
264 table::print_table(&headers, &rows);
265 }
266
267 if total_errors > displayed {
268 println!();
269 let limit_text = match options.limit {
270 None => "no limit".to_string(),
271 Some(value) => value.to_string(),
272 };
273 println!(
274 "Displayed {displayed} of {total_errors} invalid row(s) (limit: {limit_text})."
275 );
276 }
277
278 if options.show_summary && !column_summary.is_empty() {
279 println!();
280 }
281 }
282
283 if options.show_summary && !column_summary.is_empty() {
284 println!("Columns with schema violations:");
285 let mut summary_rows = column_summary
286 .iter()
287 .map(|(name, summary)| {
288 vec![
289 name.clone(),
290 summary.datatype.to_string(),
291 summary.count.to_string(),
292 ]
293 })
294 .collect::<Vec<_>>();
295 summary_rows.sort_by(|a, b| a[0].cmp(&b[0]));
296
297 let headers = vec![
298 "column".to_string(),
299 "datatype".to_string(),
300 "errors".to_string(),
301 ];
302 table::print_table(&headers, &summary_rows);
303 println!();
304 } else if options.show_detail {
305 println!();
306 }
307}
308
309fn highlight_red(value: &str) -> String {
310 const RED: &str = "\u{1b}[31m";
311 const RESET: &str = "\u{1b}[0m";
312 format!("{RED}{value}{RESET}")
313}