1use std::{collections::HashMap, path::Path};
2
3use anyhow::{Context, Result, anyhow};
4use log::info;
5
6use crate::{
7 cli::VerifyArgs,
8 data::parse_typed_value,
9 io_utils,
10 schema::{ColumnType, Schema},
11 table,
12};
13
14pub fn execute(args: &VerifyArgs) -> Result<()> {
15 let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
16 let schema = Schema::load(&args.schema)
17 .with_context(|| format!("Loading schema from {schema:?}", schema = args.schema))?;
18 let report_config = args
19 .report_invalid
20 .as_ref()
21 .map(|values| parse_report_invalid_options(values))
22 .transpose()?;
23 for input in &args.inputs {
24 let delimiter = io_utils::resolve_input_delimiter(input, args.delimiter);
25 validate_file_against_schema(&schema, input, delimiter, input_encoding, report_config)?;
26 info!("✓ {input:?} matches schema");
27 }
28 Ok(())
29}
30
31#[derive(Debug, Clone)]
32struct InvalidEntry {
33 row_number: usize,
34 column_name: String,
35 datatype: ColumnType,
36 raw_value: String,
37 normalized_value: Option<String>,
38 reason: String,
39}
40
41#[derive(Debug, Clone, Copy)]
42struct InvalidReportOptions {
43 show_detail: bool,
44 show_summary: bool,
45 limit: Option<usize>,
46}
47
48#[derive(Debug, Clone)]
49struct ColumnSummary {
50 datatype: ColumnType,
51 count: usize,
52}
53
54fn parse_report_invalid_options(values: &[String]) -> Result<InvalidReportOptions> {
55 let mut show_detail = false;
56 let mut show_summary = false;
57 let mut limit = None;
58
59 for token in values {
60 if token.is_empty() {
61 continue;
62 }
63 let lowered = token.to_ascii_lowercase();
64 match lowered.as_str() {
65 "detail" => {
66 show_detail = true;
67 }
68 "summary" => {
69 show_summary = true;
70 }
71 _ => {
72 if let Ok(parsed) = token.parse::<usize>() {
73 limit = Some(parsed);
74 if !show_detail && !show_summary {
75 show_detail = true;
76 }
77 } else {
78 return Err(anyhow!(
79 "Invalid value '{token}' for --report-invalid; expected 'detail', 'summary', or a positive integer limit"
80 ));
81 }
82 }
83 }
84 }
85
86 if !show_detail && !show_summary {
87 show_summary = true;
88 }
89
90 Ok(InvalidReportOptions {
91 show_detail,
92 show_summary,
93 limit,
94 })
95}
96
97fn validate_file_against_schema(
98 schema: &Schema,
99 path: &Path,
100 delimiter: u8,
101 encoding: &'static encoding_rs::Encoding,
102 report: Option<InvalidReportOptions>,
103) -> Result<()> {
104 let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, true)?;
105 let headers = io_utils::reader_headers(&mut reader, encoding)?;
106 schema
107 .validate_headers(&headers)
108 .map_err(|err| anyhow!("Validating headers for {path:?}: {err}"))?;
109
110 let report_cfg = report;
111 let detail_enabled = report_cfg.is_some_and(|cfg| cfg.show_detail);
112 let summary_enabled = report_cfg.is_some_and(|cfg| cfg.show_summary);
113 let report_enabled = detail_enabled || summary_enabled;
114 let collection_limit = if detail_enabled {
115 report_cfg.and_then(|cfg| cfg.limit).unwrap_or(usize::MAX)
116 } else {
117 0
118 };
119
120 let mut invalid_entries = Vec::new();
121 let mut column_summary: HashMap<String, ColumnSummary> = HashMap::new();
122 let mut total_errors = 0usize;
123
124 for (row_idx, record) in reader.byte_records().enumerate() {
125 let record = record.with_context(|| format!("Reading row {} in {path:?}", row_idx + 2))?;
126 let decoded = io_utils::decode_record(&record, encoding)?;
127 for (col_idx, column) in schema.columns.iter().enumerate() {
128 let raw_value = decoded.get(col_idx).map(|s| s.as_str()).unwrap_or("");
129 let normalized = column.normalize_value(raw_value);
130 if let Err(err) = validate_value(normalized.as_ref(), &column.datatype) {
131 if !report_enabled {
132 let message = if normalized.as_ref() == raw_value {
133 format!(
134 "Row {} column '{}': value {:?}\nReason: {}",
135 row_idx + 2,
136 column.output_name(),
137 raw_value,
138 err
139 )
140 } else {
141 format!(
142 "Row {} column '{}': value {:?} (normalized {:?})\nReason: {}",
143 row_idx + 2,
144 column.output_name(),
145 raw_value,
146 normalized.as_ref(),
147 err
148 )
149 };
150 return Err(anyhow!(message));
151 }
152
153 let normalized_owned = normalized.into_owned();
154 let normalized_changed = normalized_owned != raw_value;
155
156 total_errors += 1;
157 if detail_enabled && invalid_entries.len() < collection_limit {
158 let normalized_value = if normalized_changed {
159 Some(normalized_owned.clone())
160 } else {
161 None
162 };
163 invalid_entries.push(InvalidEntry {
164 row_number: row_idx + 2,
165 column_name: column.output_name().to_string(),
166 datatype: column.datatype.clone(),
167 raw_value: raw_value.to_string(),
168 normalized_value,
169 reason: err.to_string(),
170 });
171 }
172
173 column_summary
174 .entry(column.output_name().to_string())
175 .and_modify(|summary| summary.count += 1)
176 .or_insert_with(|| ColumnSummary {
177 datatype: column.datatype.clone(),
178 count: 1,
179 });
180 }
181 }
182 }
183
184 if report_enabled && total_errors > 0 {
185 if let Some(cfg) = report_cfg {
186 print_invalid_report(path, &invalid_entries, &column_summary, total_errors, cfg);
187 }
188 return Err(anyhow!(format!(
189 "Found {total_errors} invalid value(s) in {path:?}"
190 )));
191 }
192
193 Ok(())
194}
195
196fn validate_value(value: &str, column_type: &ColumnType) -> Result<()> {
197 if value.is_empty() {
198 return Ok(());
199 }
200 parse_typed_value(value, column_type).map(|_| ())
201}
202
203fn print_invalid_report(
204 path: &Path,
205 entries: &[InvalidEntry],
206 column_summary: &HashMap<String, ColumnSummary>,
207 total_errors: usize,
208 options: InvalidReportOptions,
209) {
210 println!();
211
212 if options.show_detail {
213 println!("Invalid rows in {}:", path.display());
214
215 let displayed = entries.len();
216
217 if entries.is_empty() {
218 println!(
219 "No sample rows captured; use '--report-invalid:detail <LIMIT>' with a higher LIMIT to display examples."
220 );
221 } else {
222 let headers = vec![
223 "row".to_string(),
224 "column".to_string(),
225 "raw".to_string(),
226 "value".to_string(),
227 "datatype".to_string(),
228 "reason".to_string(),
229 ];
230
231 let rows = entries
232 .iter()
233 .map(|entry| {
234 let highlight_target =
235 entry.normalized_value.as_ref().unwrap_or(&entry.raw_value);
236 let highlighted = highlight_red(highlight_target);
237
238 vec![
239 entry.row_number.to_string(),
240 entry.column_name.clone(),
241 entry.raw_value.clone(),
242 highlighted,
243 entry.datatype.to_string(),
244 entry.reason.clone(),
245 ]
246 })
247 .collect::<Vec<_>>();
248
249 table::print_table(&headers, &rows);
250 }
251
252 if total_errors > displayed {
253 println!();
254 let limit_text = match options.limit {
255 None => "no limit".to_string(),
256 Some(value) => value.to_string(),
257 };
258 println!(
259 "Displayed {displayed} of {total_errors} invalid row(s) (limit: {limit_text})."
260 );
261 }
262
263 if options.show_summary && !column_summary.is_empty() {
264 println!();
265 }
266 }
267
268 if options.show_summary && !column_summary.is_empty() {
269 println!("Columns with schema violations:");
270 let mut summary_rows = column_summary
271 .iter()
272 .map(|(name, summary)| {
273 vec![
274 name.clone(),
275 summary.datatype.to_string(),
276 summary.count.to_string(),
277 ]
278 })
279 .collect::<Vec<_>>();
280 summary_rows.sort_by(|a, b| a[0].cmp(&b[0]));
281
282 let headers = vec![
283 "column".to_string(),
284 "datatype".to_string(),
285 "errors".to_string(),
286 ];
287 table::print_table(&headers, &summary_rows);
288 println!();
289 } else if options.show_detail {
290 println!();
291 }
292}
293
294fn highlight_red(value: &str) -> String {
295 const RED: &str = "\u{1b}[31m";
296 const RESET: &str = "\u{1b}[0m";
297 format!("{RED}{value}{RESET}")
298}