csv_managed/
verify.rs

1use std::{collections::HashMap, path::Path};
2
3use anyhow::{Context, Result, anyhow};
4use log::info;
5
6use crate::{
7    cli::SchemaVerifyArgs,
8    data::parse_typed_value,
9    io_utils,
10    schema::{ColumnType, Schema},
11    table,
12};
13
14pub fn execute(args: &SchemaVerifyArgs) -> Result<()> {
15    let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
16    let schema = Schema::load(&args.schema)
17        .with_context(|| format!("Loading schema from {schema:?}", schema = args.schema))?;
18    let report_config = args
19        .report_invalid
20        .as_ref()
21        .map(|values| parse_report_invalid_options(values))
22        .transpose()?;
23    for input in &args.inputs {
24        let delimiter = io_utils::resolve_input_delimiter(input, args.delimiter);
25        validate_file_against_schema(&schema, input, delimiter, input_encoding, report_config)?;
26        info!("✓ {input:?} matches schema");
27    }
28    Ok(())
29}
30
31#[derive(Debug, Clone)]
32struct InvalidEntry {
33    row_number: usize,
34    column_name: String,
35    datatype: ColumnType,
36    raw_value: String,
37    normalized_value: Option<String>,
38    reason: String,
39}
40
41#[derive(Debug, Clone, Copy)]
42struct InvalidReportOptions {
43    show_detail: bool,
44    show_summary: bool,
45    limit: Option<usize>,
46}
47
48#[derive(Debug, Clone)]
49struct ColumnSummary {
50    datatype: ColumnType,
51    count: usize,
52}
53
54fn parse_report_invalid_options(values: &[String]) -> Result<InvalidReportOptions> {
55    let mut show_detail = false;
56    let mut show_summary = false;
57    let mut limit = None;
58
59    for token in values {
60        if token.is_empty() {
61            continue;
62        }
63        let lowered = token.to_ascii_lowercase();
64        match lowered.as_str() {
65            "detail" => {
66                show_detail = true;
67            }
68            "summary" => {
69                show_summary = true;
70            }
71            _ => {
72                if let Ok(parsed) = token.parse::<usize>() {
73                    limit = Some(parsed);
74                    if !show_detail && !show_summary {
75                        show_detail = true;
76                    }
77                } else {
78                    return Err(anyhow!(
79                        "Invalid value '{token}' for --report-invalid; expected 'detail', 'summary', or a positive integer limit"
80                    ));
81                }
82            }
83        }
84    }
85
86    if !show_detail && !show_summary {
87        show_summary = true;
88    }
89
90    Ok(InvalidReportOptions {
91        show_detail,
92        show_summary,
93        limit,
94    })
95}
96
97fn validate_file_against_schema(
98    schema: &Schema,
99    path: &Path,
100    delimiter: u8,
101    encoding: &'static encoding_rs::Encoding,
102    report: Option<InvalidReportOptions>,
103) -> Result<()> {
104    let mut reader =
105        io_utils::open_csv_reader_from_path(path, delimiter, schema.expects_headers())?;
106    if schema.expects_headers() {
107        let headers = io_utils::reader_headers(&mut reader, encoding)?;
108        schema
109            .validate_headers(&headers)
110            .map_err(|err| anyhow!("Validating headers for {path:?}: {err}"))?;
111    }
112
113    let report_cfg = report;
114    let detail_enabled = report_cfg.is_some_and(|cfg| cfg.show_detail);
115    let summary_enabled = report_cfg.is_some_and(|cfg| cfg.show_summary);
116    let report_enabled = detail_enabled || summary_enabled;
117    let collection_limit = if detail_enabled {
118        report_cfg.and_then(|cfg| cfg.limit).unwrap_or(usize::MAX)
119    } else {
120        0
121    };
122
123    let mut invalid_entries = Vec::new();
124    let mut column_summary: HashMap<String, ColumnSummary> = HashMap::new();
125    let mut total_errors = 0usize;
126
127    for (row_idx, record) in reader.byte_records().enumerate() {
128        let record = record.with_context(|| format!("Reading row {} in {path:?}", row_idx + 2))?;
129        let decoded = io_utils::decode_record(&record, encoding)?;
130        let mut transformed = decoded.clone();
131        if schema.has_transformations() {
132            schema
133                .apply_transformations_to_row(&mut transformed)
134                .with_context(|| {
135                    format!(
136                        "Applying datatype mappings to row {} in {path:?}",
137                        row_idx + 2
138                    )
139                })?;
140        }
141        schema.apply_replacements_to_row(&mut transformed);
142        for (col_idx, column) in schema.columns.iter().enumerate() {
143            let raw_value = decoded.get(col_idx).map(|s| s.as_str()).unwrap_or("");
144            let normalized_value = transformed.get(col_idx).map(|s| s.as_str()).unwrap_or("");
145            if let Err(err) = validate_value(normalized_value, &column.datatype) {
146                if !report_enabled {
147                    let message = if normalized_value == raw_value {
148                        format!(
149                            "Row {} column '{}': value {:?}\nReason: {}",
150                            row_idx + 2,
151                            column.output_name(),
152                            raw_value,
153                            err
154                        )
155                    } else {
156                        format!(
157                            "Row {} column '{}': value {:?} (normalized {:?})\nReason: {}",
158                            row_idx + 2,
159                            column.output_name(),
160                            raw_value,
161                            normalized_value,
162                            err
163                        )
164                    };
165                    return Err(anyhow!(message));
166                }
167
168                let normalized_owned = normalized_value.to_string();
169                let normalized_changed = normalized_owned != raw_value;
170
171                total_errors += 1;
172                if detail_enabled && invalid_entries.len() < collection_limit {
173                    let normalized_value = if normalized_changed {
174                        Some(normalized_owned.clone())
175                    } else {
176                        None
177                    };
178                    invalid_entries.push(InvalidEntry {
179                        row_number: row_idx + 2,
180                        column_name: column.output_name().to_string(),
181                        datatype: column.datatype.clone(),
182                        raw_value: raw_value.to_string(),
183                        normalized_value,
184                        reason: err.to_string(),
185                    });
186                }
187
188                column_summary
189                    .entry(column.output_name().to_string())
190                    .and_modify(|summary| summary.count += 1)
191                    .or_insert_with(|| ColumnSummary {
192                        datatype: column.datatype.clone(),
193                        count: 1,
194                    });
195            }
196        }
197    }
198
199    if report_enabled && total_errors > 0 {
200        if let Some(cfg) = report_cfg {
201            print_invalid_report(path, &invalid_entries, &column_summary, total_errors, cfg);
202        }
203        return Err(anyhow!(format!(
204            "Found {total_errors} invalid value(s) in {path:?}"
205        )));
206    }
207
208    Ok(())
209}
210
211fn validate_value(value: &str, column_type: &ColumnType) -> Result<()> {
212    if value.is_empty() {
213        return Ok(());
214    }
215    parse_typed_value(value, column_type).map(|_| ())
216}
217
218fn print_invalid_report(
219    path: &Path,
220    entries: &[InvalidEntry],
221    column_summary: &HashMap<String, ColumnSummary>,
222    total_errors: usize,
223    options: InvalidReportOptions,
224) {
225    println!();
226
227    if options.show_detail {
228        println!("Invalid rows in {}:", path.display());
229
230        let displayed = entries.len();
231
232        if entries.is_empty() {
233            println!(
234                "No sample rows captured; use '--report-invalid:detail <LIMIT>' with a higher LIMIT to display examples."
235            );
236        } else {
237            let headers = vec![
238                "row".to_string(),
239                "column".to_string(),
240                "raw".to_string(),
241                "value".to_string(),
242                "datatype".to_string(),
243                "reason".to_string(),
244            ];
245
246            let rows = entries
247                .iter()
248                .map(|entry| {
249                    let highlight_target =
250                        entry.normalized_value.as_ref().unwrap_or(&entry.raw_value);
251                    let highlighted = highlight_red(highlight_target);
252
253                    vec![
254                        entry.row_number.to_string(),
255                        entry.column_name.clone(),
256                        entry.raw_value.clone(),
257                        highlighted,
258                        entry.datatype.to_string(),
259                        entry.reason.clone(),
260                    ]
261                })
262                .collect::<Vec<_>>();
263
264            table::print_table(&headers, &rows);
265        }
266
267        if total_errors > displayed {
268            println!();
269            let limit_text = match options.limit {
270                None => "no limit".to_string(),
271                Some(value) => value.to_string(),
272            };
273            println!(
274                "Displayed {displayed} of {total_errors} invalid row(s) (limit: {limit_text})."
275            );
276        }
277
278        if options.show_summary && !column_summary.is_empty() {
279            println!();
280        }
281    }
282
283    if options.show_summary && !column_summary.is_empty() {
284        println!("Columns with schema violations:");
285        let mut summary_rows = column_summary
286            .iter()
287            .map(|(name, summary)| {
288                vec![
289                    name.clone(),
290                    summary.datatype.to_string(),
291                    summary.count.to_string(),
292                ]
293            })
294            .collect::<Vec<_>>();
295        summary_rows.sort_by(|a, b| a[0].cmp(&b[0]));
296
297        let headers = vec![
298            "column".to_string(),
299            "datatype".to_string(),
300            "errors".to_string(),
301        ];
302        table::print_table(&headers, &summary_rows);
303        println!();
304    } else if options.show_detail {
305        println!();
306    }
307}
308
309fn highlight_red(value: &str) -> String {
310    const RED: &str = "\u{1b}[31m";
311    const RESET: &str = "\u{1b}[0m";
312    format!("{RED}{value}{RESET}")
313}