csv_managed/
verify.rs

1use std::{collections::HashMap, path::Path};
2
3use anyhow::{Context, Result, anyhow};
4use log::info;
5
6use crate::{
7    cli::VerifyArgs,
8    data::parse_typed_value,
9    io_utils,
10    schema::{ColumnType, Schema},
11    table,
12};
13
14pub fn execute(args: &VerifyArgs) -> Result<()> {
15    let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
16    let schema = Schema::load(&args.schema)
17        .with_context(|| format!("Loading schema from {schema:?}", schema = args.schema))?;
18    let report_config = args
19        .report_invalid
20        .as_ref()
21        .map(|values| parse_report_invalid_options(values))
22        .transpose()?;
23    for input in &args.inputs {
24        let delimiter = io_utils::resolve_input_delimiter(input, args.delimiter);
25        validate_file_against_schema(&schema, input, delimiter, input_encoding, report_config)?;
26        info!("✓ {input:?} matches schema");
27    }
28    Ok(())
29}
30
31#[derive(Debug, Clone)]
32struct InvalidEntry {
33    row_number: usize,
34    column_name: String,
35    datatype: ColumnType,
36    raw_value: String,
37    normalized_value: Option<String>,
38    reason: String,
39}
40
41#[derive(Debug, Clone, Copy)]
42struct InvalidReportOptions {
43    show_detail: bool,
44    show_summary: bool,
45    limit: Option<usize>,
46}
47
48#[derive(Debug, Clone)]
49struct ColumnSummary {
50    datatype: ColumnType,
51    count: usize,
52}
53
54fn parse_report_invalid_options(values: &[String]) -> Result<InvalidReportOptions> {
55    let mut show_detail = false;
56    let mut show_summary = false;
57    let mut limit = None;
58
59    for token in values {
60        if token.is_empty() {
61            continue;
62        }
63        let lowered = token.to_ascii_lowercase();
64        match lowered.as_str() {
65            "detail" => {
66                show_detail = true;
67            }
68            "summary" => {
69                show_summary = true;
70            }
71            _ => {
72                if let Ok(parsed) = token.parse::<usize>() {
73                    limit = Some(parsed);
74                    if !show_detail && !show_summary {
75                        show_detail = true;
76                    }
77                } else {
78                    return Err(anyhow!(
79                        "Invalid value '{token}' for --report-invalid; expected 'detail', 'summary', or a positive integer limit"
80                    ));
81                }
82            }
83        }
84    }
85
86    if !show_detail && !show_summary {
87        show_summary = true;
88    }
89
90    Ok(InvalidReportOptions {
91        show_detail,
92        show_summary,
93        limit,
94    })
95}
96
97fn validate_file_against_schema(
98    schema: &Schema,
99    path: &Path,
100    delimiter: u8,
101    encoding: &'static encoding_rs::Encoding,
102    report: Option<InvalidReportOptions>,
103) -> Result<()> {
104    let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, true)?;
105    let headers = io_utils::reader_headers(&mut reader, encoding)?;
106    schema
107        .validate_headers(&headers)
108        .map_err(|err| anyhow!("Validating headers for {path:?}: {err}"))?;
109
110    let report_cfg = report;
111    let detail_enabled = report_cfg.is_some_and(|cfg| cfg.show_detail);
112    let summary_enabled = report_cfg.is_some_and(|cfg| cfg.show_summary);
113    let report_enabled = detail_enabled || summary_enabled;
114    let collection_limit = if detail_enabled {
115        report_cfg.and_then(|cfg| cfg.limit).unwrap_or(usize::MAX)
116    } else {
117        0
118    };
119
120    let mut invalid_entries = Vec::new();
121    let mut column_summary: HashMap<String, ColumnSummary> = HashMap::new();
122    let mut total_errors = 0usize;
123
124    for (row_idx, record) in reader.byte_records().enumerate() {
125        let record = record.with_context(|| format!("Reading row {} in {path:?}", row_idx + 2))?;
126        let decoded = io_utils::decode_record(&record, encoding)?;
127        for (col_idx, column) in schema.columns.iter().enumerate() {
128            let raw_value = decoded.get(col_idx).map(|s| s.as_str()).unwrap_or("");
129            let normalized = column.normalize_value(raw_value);
130            if let Err(err) = validate_value(normalized.as_ref(), &column.datatype) {
131                if !report_enabled {
132                    let message = if normalized.as_ref() == raw_value {
133                        format!(
134                            "Row {} column '{}': value {:?}\nReason: {}",
135                            row_idx + 2,
136                            column.output_name(),
137                            raw_value,
138                            err
139                        )
140                    } else {
141                        format!(
142                            "Row {} column '{}': value {:?} (normalized {:?})\nReason: {}",
143                            row_idx + 2,
144                            column.output_name(),
145                            raw_value,
146                            normalized.as_ref(),
147                            err
148                        )
149                    };
150                    return Err(anyhow!(message));
151                }
152
153                let normalized_owned = normalized.into_owned();
154                let normalized_changed = normalized_owned != raw_value;
155
156                total_errors += 1;
157                if detail_enabled && invalid_entries.len() < collection_limit {
158                    let normalized_value = if normalized_changed {
159                        Some(normalized_owned.clone())
160                    } else {
161                        None
162                    };
163                    invalid_entries.push(InvalidEntry {
164                        row_number: row_idx + 2,
165                        column_name: column.output_name().to_string(),
166                        datatype: column.datatype.clone(),
167                        raw_value: raw_value.to_string(),
168                        normalized_value,
169                        reason: err.to_string(),
170                    });
171                }
172
173                column_summary
174                    .entry(column.output_name().to_string())
175                    .and_modify(|summary| summary.count += 1)
176                    .or_insert_with(|| ColumnSummary {
177                        datatype: column.datatype.clone(),
178                        count: 1,
179                    });
180            }
181        }
182    }
183
184    if report_enabled && total_errors > 0 {
185        if let Some(cfg) = report_cfg {
186            print_invalid_report(path, &invalid_entries, &column_summary, total_errors, cfg);
187        }
188        return Err(anyhow!(format!(
189            "Found {total_errors} invalid value(s) in {path:?}"
190        )));
191    }
192
193    Ok(())
194}
195
196fn validate_value(value: &str, column_type: &ColumnType) -> Result<()> {
197    if value.is_empty() {
198        return Ok(());
199    }
200    parse_typed_value(value, column_type).map(|_| ())
201}
202
203fn print_invalid_report(
204    path: &Path,
205    entries: &[InvalidEntry],
206    column_summary: &HashMap<String, ColumnSummary>,
207    total_errors: usize,
208    options: InvalidReportOptions,
209) {
210    println!();
211
212    if options.show_detail {
213        println!("Invalid rows in {}:", path.display());
214
215        let displayed = entries.len();
216
217        if entries.is_empty() {
218            println!(
219                "No sample rows captured; use '--report-invalid:detail <LIMIT>' with a higher LIMIT to display examples."
220            );
221        } else {
222            let headers = vec![
223                "row".to_string(),
224                "column".to_string(),
225                "raw".to_string(),
226                "value".to_string(),
227                "datatype".to_string(),
228                "reason".to_string(),
229            ];
230
231            let rows = entries
232                .iter()
233                .map(|entry| {
234                    let highlight_target =
235                        entry.normalized_value.as_ref().unwrap_or(&entry.raw_value);
236                    let highlighted = highlight_red(highlight_target);
237
238                    vec![
239                        entry.row_number.to_string(),
240                        entry.column_name.clone(),
241                        entry.raw_value.clone(),
242                        highlighted,
243                        entry.datatype.to_string(),
244                        entry.reason.clone(),
245                    ]
246                })
247                .collect::<Vec<_>>();
248
249            table::print_table(&headers, &rows);
250        }
251
252        if total_errors > displayed {
253            println!();
254            let limit_text = match options.limit {
255                None => "no limit".to_string(),
256                Some(value) => value.to_string(),
257            };
258            println!(
259                "Displayed {displayed} of {total_errors} invalid row(s) (limit: {limit_text})."
260            );
261        }
262
263        if options.show_summary && !column_summary.is_empty() {
264            println!();
265        }
266    }
267
268    if options.show_summary && !column_summary.is_empty() {
269        println!("Columns with schema violations:");
270        let mut summary_rows = column_summary
271            .iter()
272            .map(|(name, summary)| {
273                vec![
274                    name.clone(),
275                    summary.datatype.to_string(),
276                    summary.count.to_string(),
277                ]
278            })
279            .collect::<Vec<_>>();
280        summary_rows.sort_by(|a, b| a[0].cmp(&b[0]));
281
282        let headers = vec![
283            "column".to_string(),
284            "datatype".to_string(),
285            "errors".to_string(),
286        ];
287        table::print_table(&headers, &summary_rows);
288        println!();
289    } else if options.show_detail {
290        println!();
291    }
292}
293
294fn highlight_red(value: &str) -> String {
295    const RED: &str = "\u{1b}[31m";
296    const RESET: &str = "\u{1b}[0m";
297    format!("{RED}{value}{RESET}")
298}