csv_managed/
schema_cmd.rs

1use std::collections::HashSet;
2use std::fs;
3use std::path::Path;
4use std::str::FromStr;
5
6use anyhow::{Context, Result, anyhow};
7use log::info;
8use sha2::{Digest, Sha256};
9use similar::TextDiff;
10
11use crate::{
12    cli::{
13        NaPlaceholderBehavior, SchemaArgs, SchemaColumnsArgs, SchemaInferArgs, SchemaMode,
14        SchemaProbeArgs, SchemaVerifyArgs,
15    },
16    columns, io_utils, printable_delimiter,
17    schema::{self, ColumnMeta, ColumnType, InferenceStats, Schema, ValueReplacement},
18    table, verify,
19};
20
21pub fn execute(args: &SchemaArgs) -> Result<()> {
22    match &args.mode {
23        Some(SchemaMode::Probe(probe_args)) => execute_probe(probe_args),
24        Some(SchemaMode::Infer(infer_args)) => execute_infer(infer_args),
25        Some(SchemaMode::Verify(verify_args)) => execute_verify(verify_args),
26        Some(SchemaMode::Columns(columns_args)) => execute_columns(columns_args),
27        None => execute_manual(args),
28    }
29}
30
31fn execute_manual(args: &SchemaArgs) -> Result<()> {
32    if args.columns.is_empty() {
33        return Err(anyhow!(
34            "At least one --column definition is required unless using the 'probe' or 'infer' subcommands"
35        ));
36    }
37
38    let mut columns = parse_columns(&args.columns)
39        .with_context(|| "Parsing --column definitions for schema creation".to_string())?;
40    apply_replacements(&mut columns, &args.replacements)
41        .with_context(|| "Parsing --replace definitions for schema creation".to_string())?;
42
43    let output = required_output_path(
44        args.output.as_deref(),
45        "An --output path is required for schema creation",
46    )?;
47    let schema = Schema {
48        columns,
49        schema_version: None,
50        has_headers: true,
51    };
52    schema
53        .save(output)
54        .with_context(|| format!("Writing schema to {output:?}"))?;
55
56    info!(
57        "Defined schema with {} column(s) written to {:?}",
58        schema.columns.len(),
59        output
60    );
61
62    Ok(())
63}
64
65fn resolve_placeholder_policy(args: &SchemaProbeArgs) -> schema::PlaceholderPolicy {
66    match args.na_behavior {
67        NaPlaceholderBehavior::Empty => schema::PlaceholderPolicy::TreatAsEmpty,
68        NaPlaceholderBehavior::Fill => {
69            let fill = args
70                .na_fill
71                .as_deref()
72                .map(|value| value.trim())
73                .filter(|value| !value.is_empty())
74                .unwrap_or("");
75            schema::PlaceholderPolicy::FillWith(fill.to_string())
76        }
77    }
78}
79
80fn execute_probe(args: &SchemaProbeArgs) -> Result<()> {
81    let input = &args.input;
82    let delimiter = io_utils::resolve_input_delimiter(input, args.delimiter);
83    let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
84    let placeholder_policy = resolve_placeholder_policy(args);
85    info!(
86        "Inferring schema from '{}' using delimiter '{}'",
87        input.display(),
88        printable_delimiter(delimiter)
89    );
90
91    let (mut schema, stats) = schema::infer_schema_with_stats(
92        input,
93        args.sample_rows,
94        delimiter,
95        encoding,
96        &placeholder_policy,
97        args.assume_header,
98    )
99    .with_context(|| format!("Inferring schema from {input:?}"))?;
100
101    let overrides = apply_overrides(&mut schema, &args.overrides)?;
102
103    let suggested_renames = if args.mapping {
104        Some(apply_default_name_mappings(&mut schema))
105    } else {
106        None
107    };
108
109    let report = render_probe_report(
110        &schema,
111        &stats,
112        &overrides,
113        args.sample_rows,
114        &placeholder_policy,
115        suggested_renames.as_ref(),
116    );
117    print!("{report}");
118    handle_snapshot(&report, args.snapshot.as_deref())?;
119
120    Ok(())
121}
122
123fn execute_infer(args: &SchemaInferArgs) -> Result<()> {
124    let probe = &args.probe;
125    let input_path = &probe.input;
126    let delimiter = io_utils::resolve_input_delimiter(input_path, probe.delimiter);
127    let encoding = io_utils::resolve_encoding(probe.input_encoding.as_deref())?;
128    let placeholder_policy = resolve_placeholder_policy(probe);
129    info!(
130        "Inferring schema from '{}' using delimiter '{}'",
131        input_path.display(),
132        printable_delimiter(delimiter)
133    );
134
135    let (mut schema, stats) = schema::infer_schema_with_stats(
136        input_path,
137        probe.sample_rows,
138        delimiter,
139        encoding,
140        &placeholder_policy,
141        probe.assume_header,
142    )
143    .with_context(|| format!("Inferring schema from {input_path:?}"))?;
144
145    let overrides = apply_overrides(&mut schema, &probe.overrides)?;
146
147    let suggested_renames = if probe.mapping {
148        Some(apply_default_name_mappings(&mut schema))
149    } else {
150        None
151    };
152
153    let diff_request = if let Some(path) = args.diff.as_deref() {
154        Some((
155            path.to_path_buf(),
156            fs::read_to_string(path)
157                .with_context(|| format!("Reading existing schema for diff from {path:?}"))?,
158        ))
159    } else {
160        None
161    };
162
163    let mut report: Option<String> = None;
164
165    if let Some(snapshot_path) = probe.snapshot.as_deref() {
166        let report_ref = report.get_or_insert_with(|| {
167            render_probe_report(
168                &schema,
169                &stats,
170                &overrides,
171                probe.sample_rows,
172                &placeholder_policy,
173                suggested_renames.as_ref(),
174            )
175        });
176        if !args.preview {
177            print!("{report_ref}");
178        }
179        handle_snapshot(report_ref, Some(snapshot_path))?;
180    }
181
182    let preview_requested = args.preview;
183    let should_write = !preview_requested && (args.output.is_some() || args.replace_template);
184    let diff_requested = diff_request.is_some();
185
186    if preview_requested && let Some(path) = args.output.as_deref() {
187        info!("Preview requested; suppressing write to {:?}", path);
188    }
189
190    let apply_replacements = preview_requested || should_write || diff_requested;
191    let replacements_added = if apply_replacements {
192        schema::apply_placeholder_replacements(&mut schema, &stats, &placeholder_policy)
193    } else {
194        0
195    };
196    if replacements_added > 0 {
197        info!(
198            "Added {} NA placeholder replacement(s) to schema",
199            replacements_added
200        );
201    }
202
203    let mut yaml_output = if preview_requested || diff_requested {
204        Some(
205            schema
206                .to_yaml_string(args.replace_template)
207                .with_context(|| "Serializing inferred schema to YAML".to_string())?,
208        )
209    } else {
210        None
211    };
212
213    if preview_requested {
214        println!();
215        println!("Schema YAML Preview (not written):");
216        let yaml = yaml_output
217            .as_deref()
218            .expect("Preview requires serialized YAML output");
219        print!("{yaml}");
220        if !yaml.ends_with('\n') {
221            println!();
222        }
223        info!(
224            "Previewed schema for {} column(s) (no file written)",
225            schema.columns.len()
226        );
227    } else if should_write {
228        let output = required_output_path(
229            args.output.as_deref(),
230            "An --output path is required when writing an inferred schema",
231        )?;
232        if args.replace_template {
233            schema
234                .save_with_replace_template(output)
235                .with_context(|| format!("Writing schema to {output:?}"))?;
236        } else {
237            schema
238                .save(output)
239                .with_context(|| format!("Writing schema to {output:?}"))?;
240        }
241        info!(
242            "Inferred schema for {} column(s) written to {:?}",
243            schema.columns.len(),
244            output
245        );
246    } else {
247        info!(
248            "Inferred schema for {} column(s) (no output file written)",
249            schema.columns.len()
250        );
251    }
252
253    if let Some((diff_path, existing_content)) = &diff_request {
254        if yaml_output.is_none() {
255            yaml_output = Some(
256                schema
257                    .to_yaml_string(args.replace_template)
258                    .with_context(|| "Serializing inferred schema to YAML".to_string())?,
259            );
260        }
261        let new_yaml = yaml_output
262            .as_ref()
263            .expect("Diff requires serialized YAML output");
264        println!();
265        if existing_content == new_yaml {
266            println!(
267                "Schema Diff vs {}: no changes detected.",
268                diff_path.display()
269            );
270        } else {
271            println!("Schema Diff vs {}:", diff_path.display());
272            let diff = TextDiff::from_lines(existing_content, new_yaml);
273            let diff_text = diff
274                .unified_diff()
275                .context_radius(3)
276                .header(&format!("{}", diff_path.display()), "(inferred)")
277                .to_string();
278            if diff_text.is_empty() {
279                println!("(differences detected, but diff output was empty)");
280            } else {
281                print!("{diff_text}");
282                if !diff_text.ends_with('\n') {
283                    println!();
284                }
285            }
286        }
287    }
288
289    if probe.mapping {
290        if preview_requested {
291            println!();
292        }
293        emit_mappings(&schema);
294    }
295
296    Ok(())
297}
298
299fn execute_verify(args: &SchemaVerifyArgs) -> Result<()> {
300    verify::execute(args)
301}
302
303fn execute_columns(args: &SchemaColumnsArgs) -> Result<()> {
304    columns::execute(args)
305}
306
307fn required_output_path<'a>(output: Option<&'a Path>, message: &str) -> Result<&'a Path> {
308    output.ok_or_else(|| anyhow!(message.to_string()))
309}
310
311fn parse_columns(specs: &[String]) -> Result<Vec<ColumnMeta>> {
312    let mut columns = Vec::new();
313    let mut seen = HashSet::new();
314    let mut output_names = HashSet::new();
315
316    for raw in specs {
317        for token in raw.split(',') {
318            let token = token.trim();
319            if token.is_empty() {
320                continue;
321            }
322            let (name_part, type_part) = token.split_once(':').ok_or_else(|| {
323                anyhow!("Column definition '{token}' must use the form name:type")
324            })?;
325
326            let name = name_part.trim();
327            if name.is_empty() {
328                return Err(anyhow!(
329                    "Column name cannot be empty in definition '{token}'"
330                ));
331            }
332            if !seen.insert(name.to_string()) {
333                return Err(anyhow!("Duplicate column name '{name}' provided"));
334            }
335
336            let (type_raw, rename_raw) = if let Some((ty, rename)) = type_part.split_once("->") {
337                (ty, Some(rename))
338            } else {
339                (type_part, None)
340            };
341
342            let column_type = ColumnType::from_str(type_raw.trim())
343                .map_err(|err| anyhow!("Column '{name}' has invalid type '{type_part}': {err}"))?;
344
345            let rename = rename_raw
346                .map(|value| value.trim())
347                .filter(|value| !value.is_empty())
348                .map(|value| value.to_string());
349
350            if let Some(ref alias) = rename {
351                if alias != name && seen.contains(alias) {
352                    return Err(anyhow!(
353                        "Output name '{alias}' conflicts with an existing column name"
354                    ));
355                }
356                if !output_names.insert(alias.clone()) {
357                    return Err(anyhow!("Duplicate output column name '{alias}' provided"));
358                }
359            }
360
361            if rename.is_none() {
362                output_names.insert(name.to_string());
363            }
364
365            columns.push(ColumnMeta {
366                name: name.to_string(),
367                datatype: column_type,
368                rename,
369                value_replacements: Vec::new(),
370                datatype_mappings: Vec::new(),
371            });
372        }
373    }
374
375    if columns.is_empty() {
376        return Err(anyhow!("At least one --column definition is required"));
377    }
378
379    Ok(columns)
380}
381
382fn apply_replacements(columns: &mut [ColumnMeta], specs: &[String]) -> Result<()> {
383    if specs.is_empty() {
384        return Ok(());
385    }
386    let mut lookup = HashSet::new();
387    for column in columns.iter() {
388        lookup.insert(column.name.clone());
389    }
390
391    for raw in specs {
392        let spec = raw.trim();
393        if spec.is_empty() {
394            continue;
395        }
396        let (column_name, mapping) = spec.split_once('=').ok_or_else(|| {
397            anyhow!("Replacement '{spec}' must use the form column=value->new_value")
398        })?;
399        let column_name = column_name.trim();
400        if column_name.is_empty() {
401            return Err(anyhow!("Replacement '{spec}' is missing a column name"));
402        }
403        if !lookup.contains(column_name) {
404            return Err(anyhow!(
405                "Replacement references unknown column '{column_name}'"
406            ));
407        }
408        let (from_raw, to_raw) = mapping.split_once("->").ok_or_else(|| {
409            anyhow!(
410                "Replacement '{spec}' must include '->' to separate original and replacement values"
411            )
412        })?;
413        let from = from_raw.trim().to_string();
414        let to = to_raw.trim().to_string();
415        let column = columns
416            .iter_mut()
417            .find(|c| c.name == column_name)
418            .expect("column should exist");
419        if let Some(existing) = column
420            .value_replacements
421            .iter()
422            .position(|r| r.from == from)
423        {
424            column.value_replacements.remove(existing);
425        }
426        column
427            .value_replacements
428            .push(ValueReplacement { from, to });
429    }
430
431    Ok(())
432}
433
434fn apply_overrides(schema: &mut Schema, overrides: &[String]) -> Result<HashSet<String>> {
435    if overrides.is_empty() {
436        return Ok(HashSet::new());
437    }
438
439    let mut seen = HashSet::new();
440    let mut applied = HashSet::new();
441    for raw in overrides {
442        let spec = raw.trim();
443        if spec.is_empty() {
444            continue;
445        }
446        let (name_part, type_part) = spec
447            .split_once(':')
448            .ok_or_else(|| anyhow!("Override '{spec}' must use the form name:type"))?;
449        let name = name_part.trim();
450        if name.is_empty() {
451            return Err(anyhow!("Override '{spec}' is missing a column name"));
452        }
453        if !seen.insert(name.to_string()) {
454            return Err(anyhow!("Duplicate override provided for column '{name}'"));
455        }
456
457        let override_type = ColumnType::from_str(type_part.trim()).with_context(|| {
458            format!("Override for column '{name}' has invalid type '{type_part}'")
459        })?;
460
461        let column = schema
462            .columns
463            .iter_mut()
464            .find(|col| col.name == name)
465            .ok_or_else(|| anyhow!("Override references unknown column '{name}'"))?;
466        column.datatype = override_type;
467        applied.insert(name.to_string());
468    }
469
470    Ok(applied)
471}
472
473fn apply_default_name_mappings(schema: &mut Schema) -> HashSet<String> {
474    let mut suggested = HashSet::new();
475    for column in &mut schema.columns {
476        if column.rename.is_none() {
477            let suggestion = to_lower_snake_case(&column.name);
478            column.rename = Some(suggestion);
479            suggested.insert(column.name.clone());
480        }
481    }
482    suggested
483}
484
485fn render_probe_report(
486    schema: &Schema,
487    stats: &InferenceStats,
488    overrides: &HashSet<String>,
489    requested_sample_rows: usize,
490    placeholder_policy: &schema::PlaceholderPolicy,
491    suggested_renames: Option<&HashSet<String>>,
492) -> String {
493    if schema.columns.is_empty() {
494        return "No columns inferred.\n".to_string();
495    }
496    let rows_read = stats.rows_read();
497    let headers = vec![
498        "#".to_string(),
499        "name".to_string(),
500        "type".to_string(),
501        "rename".to_string(),
502        "override".to_string(),
503        "sample".to_string(),
504        "format".to_string(),
505        "observations".to_string(),
506    ];
507    let mut rows = Vec::with_capacity(schema.columns.len());
508    for (idx, column) in schema.columns.iter().enumerate() {
509        let rename_display = column
510            .rename
511            .as_deref()
512            .filter(|value| !value.is_empty())
513            .map(|value| {
514                if suggested_renames.is_some_and(|set| set.contains(&column.name)) {
515                    format!("{value} (suggested)")
516                } else {
517                    value.to_string()
518                }
519            })
520            .unwrap_or_else(|| "—".to_string());
521        let mut status_flags = Vec::new();
522        if overrides.contains(&column.name) {
523            status_flags.push("type");
524        }
525        if column.rename.is_some() {
526            status_flags.push("mapping");
527        }
528        let status_display = if status_flags.is_empty() {
529            "—".to_string()
530        } else {
531            status_flags.join("+")
532        };
533        let sample_display = stats
534            .sample_value(idx)
535            .map(truncate_sample)
536            .unwrap_or_else(|| "—".to_string());
537        let format_display = schema::format_hint_for(&column.datatype, stats.sample_value(idx))
538            .unwrap_or_else(|| "—".to_string());
539        let observation_display = column_observation_summary(stats, idx, rows_read);
540        rows.push(vec![
541            (idx + 1).to_string(),
542            column.name.clone(),
543            column.datatype.to_string(),
544            rename_display,
545            status_display,
546            sample_display,
547            format_display,
548            observation_display,
549        ]);
550    }
551    let mut output = table::render_table(&headers, &rows);
552
553    if requested_sample_rows == 0 {
554        output.push_str(&format!("\nSampled {rows_read} row(s) (full scan).\n"));
555    } else if rows_read >= requested_sample_rows {
556        output.push_str(&format!(
557            "\nSampled {rows_read} row(s) (requested limit {requested_sample_rows}).\n"
558        ));
559    } else {
560        output.push_str(&format!(
561            "\nSampled {rows_read} row(s) out of requested {requested_sample_rows}.\n"
562        ));
563    }
564    if stats.decode_errors() > 0 {
565        output.push_str(&format!(
566            "Skipped {} value(s) due to decoding errors.\n",
567            stats.decode_errors()
568        ));
569    } else {
570        output.push_str("No decoding errors encountered.\n");
571    }
572
573    let signature = compute_schema_signature(schema);
574    output.push_str(&format!("Header+Type Hash: {signature}\n"));
575
576    if let Some(section) = render_placeholder_section(schema, stats, placeholder_policy) {
577        output.push_str(&section);
578    }
579
580    output
581}
582
583fn truncate_sample(value: &str) -> String {
584    const LIMIT: usize = 32;
585    let mut result = String::new();
586    for (idx, ch) in value.chars().enumerate() {
587        if idx >= LIMIT {
588            result.push('…');
589            break;
590        }
591        result.push(ch);
592    }
593    result
594}
595
596fn summarize_histogram_value(value: &str) -> String {
597    let mut sanitized = String::with_capacity(value.len());
598    for ch in value.chars() {
599        match ch {
600            '\n' | '\r' | '\t' => sanitized.push(' '),
601            _ => sanitized.push(ch),
602        }
603    }
604    truncate_sample(&sanitized)
605}
606
607fn column_observation_summary(
608    stats: &InferenceStats,
609    column_index: usize,
610    rows_read: usize,
611) -> String {
612    let mut fragments = Vec::new();
613    if let Some(summary) = stats.summary(column_index) {
614        fragments.push(format!("non_empty={}", summary.non_empty));
615        let empty = rows_read.saturating_sub(summary.non_empty);
616        if rows_read > 0 && empty > 0 {
617            fragments.push(format!("empty={empty}"));
618        }
619        if !summary.tracked_values.is_empty() {
620            let histogram = summary
621                .tracked_values
622                .iter()
623                .take(3)
624                .map(|(value, count)| {
625                    let display = summarize_histogram_value(value);
626                    format!("{display} ({count})")
627                })
628                .collect::<Vec<_>>()
629                .join(", ");
630            fragments.push(format!("samples=[{histogram}]"));
631            if summary.tracked_values.len() > 3 {
632                fragments.push("samples+=...".to_string());
633            }
634        }
635        if summary.other_values > 0 {
636            fragments.push(format!("others={}", summary.other_values));
637        }
638    }
639
640    if let Some(placeholders) = stats.placeholder_summary(column_index) {
641        let entries = placeholders.entries();
642        if !entries.is_empty() {
643            let tokens = entries
644                .iter()
645                .take(3)
646                .map(|(token, count)| {
647                    let display = truncate_sample(token);
648                    format!("{display} ({count})")
649                })
650                .collect::<Vec<_>>()
651                .join(", ");
652            fragments.push(format!("placeholders=[{tokens}]"));
653            if entries.len() > 3 {
654                fragments.push("placeholders+=...".to_string());
655            }
656        }
657    }
658
659    if fragments.is_empty() {
660        "—".to_string()
661    } else {
662        fragments.join("; ")
663    }
664}
665
666fn render_placeholder_section(
667    schema: &Schema,
668    stats: &InferenceStats,
669    placeholder_policy: &schema::PlaceholderPolicy,
670) -> Option<String> {
671    let mut blocks = Vec::new();
672    for (idx, column) in schema.columns.iter().enumerate() {
673        let Some(summary) = stats.placeholder_summary(idx) else {
674            continue;
675        };
676        let entries = summary.entries();
677        if entries.is_empty() {
678            continue;
679        }
680        let mut block = String::new();
681        let type_note = if column.datatype != ColumnType::String {
682            " (non-string)"
683        } else {
684            ""
685        };
686        block.push_str(&format!(
687            "  • {} ({}{})\n",
688            column.name, column.datatype, type_note
689        ));
690        let tokens = entries
691            .iter()
692            .map(|(token, count)| format!("{token} ({count})"))
693            .collect::<Vec<_>>()
694            .join(", ");
695        block.push_str(&format!("    tokens: {tokens}\n"));
696        block.push_str("    replacements:\n");
697        let target_display = match placeholder_policy {
698            schema::PlaceholderPolicy::TreatAsEmpty => "\"\"".to_string(),
699            schema::PlaceholderPolicy::FillWith(value) => format!("\"{value}\""),
700        };
701        for (token, _) in entries {
702            block.push_str(&format!(
703                "      - from \"{token}\" -> to {target_display}\n"
704            ));
705        }
706        blocks.push(block);
707    }
708
709    if blocks.is_empty() {
710        return None;
711    }
712
713    let mut section = match placeholder_policy {
714        schema::PlaceholderPolicy::TreatAsEmpty => {
715            "\nPlaceholder Suggestions (replace with empty string):\n".to_string()
716        }
717        schema::PlaceholderPolicy::FillWith(value) => {
718            format!("\nPlaceholder Suggestions (replace with '{value}'):\n")
719        }
720    };
721    for block in blocks {
722        section.push_str(&block);
723    }
724    Some(section)
725}
726
727fn compute_schema_signature(schema: &Schema) -> String {
728    let mut hasher = Sha256::new();
729    for column in &schema.columns {
730        hasher.update(column.name.as_bytes());
731        hasher.update(b":");
732        hasher.update(column.datatype.signature_token().as_bytes());
733        hasher.update(b";");
734    }
735    format!("{:x}", hasher.finalize())
736}
737
738fn emit_mappings(schema: &Schema) {
739    if schema.columns.is_empty() {
740        println!("No columns found to emit mappings.");
741        return;
742    }
743    let mut rows = Vec::with_capacity(schema.columns.len());
744    for (idx, column) in schema.columns.iter().enumerate() {
745        let mapping = format!("{}:{}->", column.name, column.datatype.cli_token());
746        let suggested = column
747            .rename
748            .as_ref()
749            .filter(|value| !value.is_empty())
750            .cloned()
751            .unwrap_or_else(|| to_lower_snake_case(&column.name));
752        rows.push(vec![
753            (idx + 1).to_string(),
754            column.name.clone(),
755            column.datatype.to_string(),
756            mapping,
757            suggested,
758        ]);
759    }
760    let headers = vec![
761        "#".to_string(),
762        "name".to_string(),
763        "type".to_string(),
764        "mapping".to_string(),
765        "suggested".to_string(),
766    ];
767    table::print_table(&headers, &rows);
768}
769
770fn handle_snapshot(report: &str, snapshot_path: Option<&Path>) -> Result<()> {
771    let Some(path) = snapshot_path else {
772        return Ok(());
773    };
774
775    if path.exists() {
776        let expected =
777            fs::read_to_string(path).with_context(|| format!("Reading snapshot from {path:?}"))?;
778        if expected != report {
779            return Err(anyhow!(
780                "Probe output does not match snapshot at {path:?}. Inspect differences and update the snapshot if the change is intentional."
781            ));
782        }
783    } else {
784        if let Some(parent) = path.parent().filter(|p| !p.as_os_str().is_empty()) {
785            fs::create_dir_all(parent)
786                .with_context(|| format!("Creating snapshot directory {parent:?}"))?;
787        }
788        fs::write(path, report).with_context(|| format!("Writing snapshot to {path:?}"))?;
789        eprintln!("Snapshot captured at {path:?}");
790    }
791
792    Ok(())
793}
794
795fn to_lower_snake_case(value: &str) -> String {
796    let mut result = String::new();
797    let mut chars = value.chars().peekable();
798    let mut last_was_separator = true;
799    let mut last_was_upper = false;
800    while let Some(ch) = chars.next() {
801        if ch.is_ascii_alphanumeric() {
802            if ch.is_ascii_uppercase() {
803                let next_is_lowercase = chars
804                    .peek()
805                    .map(|c| c.is_ascii_lowercase())
806                    .unwrap_or(false);
807                if !result.is_empty()
808                    && (!last_was_separator && (!last_was_upper || next_is_lowercase))
809                    && !result.ends_with('_')
810                {
811                    result.push('_');
812                }
813                result.push(ch.to_ascii_lowercase());
814                last_was_separator = false;
815                last_was_upper = true;
816            } else {
817                if !result.is_empty() && last_was_separator && !result.ends_with('_') {
818                    result.push('_');
819                }
820                result.push(ch.to_ascii_lowercase());
821                last_was_separator = false;
822                last_was_upper = false;
823            }
824        } else {
825            if !result.ends_with('_') && !result.is_empty() {
826                result.push('_');
827            }
828            last_was_separator = true;
829            last_was_upper = false;
830        }
831    }
832    while result.ends_with('_') {
833        result.pop();
834    }
835    if result.is_empty() {
836        value.to_ascii_lowercase()
837    } else {
838        result
839    }
840}
841
842#[cfg(test)]
843mod tests {
844    use super::*;
845
846    #[test]
847    fn parse_columns_accepts_comma_and_repeats() {
848        let specs = vec![
849            "id:integer,name:string".to_string(),
850            "amount:float".to_string(),
851        ];
852        let columns = parse_columns(&specs).expect("parsed");
853        assert_eq!(columns.len(), 3);
854        assert_eq!(columns[0].name, "id");
855        assert_eq!(columns[1].name, "name");
856        assert_eq!(columns[2].name, "amount");
857        assert_eq!(columns[0].datatype, ColumnType::Integer);
858        assert_eq!(columns[1].datatype, ColumnType::String);
859        assert_eq!(columns[2].datatype, ColumnType::Float);
860    }
861
862    #[test]
863    fn duplicate_columns_are_rejected() {
864        let specs = vec!["id:integer,id:string".to_string()];
865        let err = parse_columns(&specs).unwrap_err();
866        assert!(err.to_string().contains("Duplicate column name"));
867    }
868
869    #[test]
870    fn missing_type_is_rejected() {
871        let specs = vec!["id".to_string()];
872        let err = parse_columns(&specs).unwrap_err();
873        assert!(err.to_string().contains("must use the form"));
874    }
875
876    #[test]
877    fn parse_columns_supports_output_rename() {
878        let specs = vec!["id:integer->Identifier,name:string".to_string()];
879        let columns = parse_columns(&specs).expect("parsed");
880        assert_eq!(columns.len(), 2);
881        assert_eq!(columns[0].rename.as_deref(), Some("Identifier"));
882        assert!(columns[1].rename.is_none());
883    }
884
885    #[test]
886    fn duplicate_output_names_are_rejected() {
887        let specs = vec![
888            "id:integer->Identifier".to_string(),
889            "code:string->Identifier".to_string(),
890        ];
891        let err = parse_columns(&specs).unwrap_err();
892        assert!(err.to_string().contains("Duplicate output column name"));
893    }
894
895    #[test]
896    fn replacements_apply_to_columns() {
897        let specs = vec!["status:string".to_string()];
898        let mut columns = parse_columns(&specs).expect("parsed");
899        let replacements = vec!["status=pending->shipped".to_string()];
900        apply_replacements(&mut columns, &replacements).expect("applied");
901        assert_eq!(columns[0].value_replacements.len(), 1);
902        assert_eq!(columns[0].value_replacements[0].from, "pending");
903        assert_eq!(columns[0].value_replacements[0].to, "shipped");
904    }
905
906    #[test]
907    fn replacements_validate_column_names() {
908        let specs = vec!["status:string".to_string()];
909        let mut columns = parse_columns(&specs).expect("parsed");
910        let replacements = vec!["missing=pending->shipped".to_string()];
911        let err = apply_replacements(&mut columns, &replacements).unwrap_err();
912        assert!(err.to_string().contains("unknown column"));
913    }
914
915    #[test]
916    fn to_lower_snake_case_converts_names() {
917        assert_eq!(to_lower_snake_case("OrderDate"), "order_date");
918        assert_eq!(to_lower_snake_case("customer-name"), "customer_name");
919        assert_eq!(to_lower_snake_case("customer  name"), "customer_name");
920        assert_eq!(to_lower_snake_case("APIKey"), "api_key");
921        assert_eq!(to_lower_snake_case("HTTPStatus"), "http_status");
922    }
923
924    #[test]
925    fn apply_overrides_updates_types() {
926        let mut schema = Schema {
927            columns: vec![ColumnMeta {
928                name: "amount".to_string(),
929                datatype: ColumnType::Float,
930                rename: None,
931                value_replacements: Vec::new(),
932                datatype_mappings: Vec::new(),
933            }],
934            schema_version: None,
935            has_headers: true,
936        };
937        let overrides = vec!["amount:integer".to_string(), "".to_string()];
938        let applied = apply_overrides(&mut schema, &overrides).unwrap();
939        assert_eq!(schema.columns[0].datatype, ColumnType::Integer);
940        assert!(applied.contains("amount"));
941    }
942
943    #[test]
944    fn apply_default_name_mappings_returns_suggested_set() {
945        let mut schema = Schema {
946            columns: vec![
947                ColumnMeta {
948                    name: "OrderID".to_string(),
949                    datatype: ColumnType::Integer,
950                    rename: None,
951                    value_replacements: Vec::new(),
952                    datatype_mappings: Vec::new(),
953                },
954                ColumnMeta {
955                    name: "CustomerName".to_string(),
956                    datatype: ColumnType::String,
957                    rename: Some("customer_name".to_string()),
958                    value_replacements: Vec::new(),
959                    datatype_mappings: Vec::new(),
960                },
961            ],
962            schema_version: None,
963            has_headers: true,
964        };
965
966        let suggested = apply_default_name_mappings(&mut schema);
967
968        assert_eq!(schema.columns[0].rename.as_deref(), Some("order_id"));
969        assert_eq!(schema.columns[1].rename.as_deref(), Some("customer_name"));
970        assert!(suggested.contains("OrderID"));
971        assert!(!suggested.contains("CustomerName"));
972    }
973}