1use std::collections::HashSet;
2use std::fs;
3use std::path::Path;
4use std::str::FromStr;
5
6use anyhow::{Context, Result, anyhow};
7use log::info;
8use sha2::{Digest, Sha256};
9use similar::TextDiff;
10
11use crate::{
12 cli::{
13 NaPlaceholderBehavior, SchemaArgs, SchemaColumnsArgs, SchemaInferArgs, SchemaMode,
14 SchemaProbeArgs, SchemaVerifyArgs,
15 },
16 columns, io_utils, printable_delimiter,
17 schema::{self, ColumnMeta, ColumnType, InferenceStats, Schema, ValueReplacement},
18 table, verify,
19};
20
21pub fn execute(args: &SchemaArgs) -> Result<()> {
22 match &args.mode {
23 Some(SchemaMode::Probe(probe_args)) => execute_probe(probe_args),
24 Some(SchemaMode::Infer(infer_args)) => execute_infer(infer_args),
25 Some(SchemaMode::Verify(verify_args)) => execute_verify(verify_args),
26 Some(SchemaMode::Columns(columns_args)) => execute_columns(columns_args),
27 None => execute_manual(args),
28 }
29}
30
31fn execute_manual(args: &SchemaArgs) -> Result<()> {
32 if args.columns.is_empty() {
33 return Err(anyhow!(
34 "At least one --column definition is required unless using the 'probe' or 'infer' subcommands"
35 ));
36 }
37
38 let mut columns = parse_columns(&args.columns)
39 .with_context(|| "Parsing --column definitions for schema creation".to_string())?;
40 apply_replacements(&mut columns, &args.replacements)
41 .with_context(|| "Parsing --replace definitions for schema creation".to_string())?;
42
43 let output = required_output_path(
44 args.output.as_deref(),
45 "An --output path is required for schema creation",
46 )?;
47 let schema = Schema {
48 columns,
49 schema_version: None,
50 has_headers: true,
51 };
52 schema
53 .save(output)
54 .with_context(|| format!("Writing schema to {output:?}"))?;
55
56 info!(
57 "Defined schema with {} column(s) written to {:?}",
58 schema.columns.len(),
59 output
60 );
61
62 Ok(())
63}
64
65fn resolve_placeholder_policy(args: &SchemaProbeArgs) -> schema::PlaceholderPolicy {
66 match args.na_behavior {
67 NaPlaceholderBehavior::Empty => schema::PlaceholderPolicy::TreatAsEmpty,
68 NaPlaceholderBehavior::Fill => {
69 let fill = args
70 .na_fill
71 .as_deref()
72 .map(|value| value.trim())
73 .filter(|value| !value.is_empty())
74 .unwrap_or("");
75 schema::PlaceholderPolicy::FillWith(fill.to_string())
76 }
77 }
78}
79
80fn execute_probe(args: &SchemaProbeArgs) -> Result<()> {
81 let input = &args.input;
82 let delimiter = io_utils::resolve_input_delimiter(input, args.delimiter);
83 let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
84 let placeholder_policy = resolve_placeholder_policy(args);
85 info!(
86 "Inferring schema from '{}' using delimiter '{}'",
87 input.display(),
88 printable_delimiter(delimiter)
89 );
90
91 let (mut schema, stats) = schema::infer_schema_with_stats(
92 input,
93 args.sample_rows,
94 delimiter,
95 encoding,
96 &placeholder_policy,
97 args.assume_header,
98 )
99 .with_context(|| format!("Inferring schema from {input:?}"))?;
100
101 let overrides = apply_overrides(&mut schema, &args.overrides)?;
102
103 let suggested_renames = if args.mapping {
104 Some(apply_default_name_mappings(&mut schema))
105 } else {
106 None
107 };
108
109 let report = render_probe_report(
110 &schema,
111 &stats,
112 &overrides,
113 args.sample_rows,
114 &placeholder_policy,
115 suggested_renames.as_ref(),
116 );
117 print!("{report}");
118 handle_snapshot(&report, args.snapshot.as_deref())?;
119
120 Ok(())
121}
122
123fn execute_infer(args: &SchemaInferArgs) -> Result<()> {
124 let probe = &args.probe;
125 let input_path = &probe.input;
126 let delimiter = io_utils::resolve_input_delimiter(input_path, probe.delimiter);
127 let encoding = io_utils::resolve_encoding(probe.input_encoding.as_deref())?;
128 let placeholder_policy = resolve_placeholder_policy(probe);
129 info!(
130 "Inferring schema from '{}' using delimiter '{}'",
131 input_path.display(),
132 printable_delimiter(delimiter)
133 );
134
135 let (mut schema, stats) = schema::infer_schema_with_stats(
136 input_path,
137 probe.sample_rows,
138 delimiter,
139 encoding,
140 &placeholder_policy,
141 probe.assume_header,
142 )
143 .with_context(|| format!("Inferring schema from {input_path:?}"))?;
144
145 let overrides = apply_overrides(&mut schema, &probe.overrides)?;
146
147 let suggested_renames = if probe.mapping {
148 Some(apply_default_name_mappings(&mut schema))
149 } else {
150 None
151 };
152
153 let diff_request = if let Some(path) = args.diff.as_deref() {
154 Some((
155 path.to_path_buf(),
156 fs::read_to_string(path)
157 .with_context(|| format!("Reading existing schema for diff from {path:?}"))?,
158 ))
159 } else {
160 None
161 };
162
163 let mut report: Option<String> = None;
164
165 if let Some(snapshot_path) = probe.snapshot.as_deref() {
166 let report_ref = report.get_or_insert_with(|| {
167 render_probe_report(
168 &schema,
169 &stats,
170 &overrides,
171 probe.sample_rows,
172 &placeholder_policy,
173 suggested_renames.as_ref(),
174 )
175 });
176 if !args.preview {
177 print!("{report_ref}");
178 }
179 handle_snapshot(report_ref, Some(snapshot_path))?;
180 }
181
182 let preview_requested = args.preview;
183 let should_write = !preview_requested && (args.output.is_some() || args.replace_template);
184 let diff_requested = diff_request.is_some();
185
186 if preview_requested && let Some(path) = args.output.as_deref() {
187 info!("Preview requested; suppressing write to {:?}", path);
188 }
189
190 let apply_replacements = preview_requested || should_write || diff_requested;
191 let replacements_added = if apply_replacements {
192 schema::apply_placeholder_replacements(&mut schema, &stats, &placeholder_policy)
193 } else {
194 0
195 };
196 if replacements_added > 0 {
197 info!(
198 "Added {} NA placeholder replacement(s) to schema",
199 replacements_added
200 );
201 }
202
203 let mut yaml_output = if preview_requested || diff_requested {
204 Some(
205 schema
206 .to_yaml_string(args.replace_template)
207 .with_context(|| "Serializing inferred schema to YAML".to_string())?,
208 )
209 } else {
210 None
211 };
212
213 if preview_requested {
214 println!();
215 println!("Schema YAML Preview (not written):");
216 let yaml = yaml_output
217 .as_deref()
218 .expect("Preview requires serialized YAML output");
219 print!("{yaml}");
220 if !yaml.ends_with('\n') {
221 println!();
222 }
223 info!(
224 "Previewed schema for {} column(s) (no file written)",
225 schema.columns.len()
226 );
227 } else if should_write {
228 let output = required_output_path(
229 args.output.as_deref(),
230 "An --output path is required when writing an inferred schema",
231 )?;
232 if args.replace_template {
233 schema
234 .save_with_replace_template(output)
235 .with_context(|| format!("Writing schema to {output:?}"))?;
236 } else {
237 schema
238 .save(output)
239 .with_context(|| format!("Writing schema to {output:?}"))?;
240 }
241 info!(
242 "Inferred schema for {} column(s) written to {:?}",
243 schema.columns.len(),
244 output
245 );
246 } else {
247 info!(
248 "Inferred schema for {} column(s) (no output file written)",
249 schema.columns.len()
250 );
251 }
252
253 if let Some((diff_path, existing_content)) = &diff_request {
254 if yaml_output.is_none() {
255 yaml_output = Some(
256 schema
257 .to_yaml_string(args.replace_template)
258 .with_context(|| "Serializing inferred schema to YAML".to_string())?,
259 );
260 }
261 let new_yaml = yaml_output
262 .as_ref()
263 .expect("Diff requires serialized YAML output");
264 println!();
265 if existing_content == new_yaml {
266 println!(
267 "Schema Diff vs {}: no changes detected.",
268 diff_path.display()
269 );
270 } else {
271 println!("Schema Diff vs {}:", diff_path.display());
272 let diff = TextDiff::from_lines(existing_content, new_yaml);
273 let diff_text = diff
274 .unified_diff()
275 .context_radius(3)
276 .header(&format!("{}", diff_path.display()), "(inferred)")
277 .to_string();
278 if diff_text.is_empty() {
279 println!("(differences detected, but diff output was empty)");
280 } else {
281 print!("{diff_text}");
282 if !diff_text.ends_with('\n') {
283 println!();
284 }
285 }
286 }
287 }
288
289 if probe.mapping {
290 if preview_requested {
291 println!();
292 }
293 emit_mappings(&schema);
294 }
295
296 Ok(())
297}
298
299fn execute_verify(args: &SchemaVerifyArgs) -> Result<()> {
300 verify::execute(args)
301}
302
303fn execute_columns(args: &SchemaColumnsArgs) -> Result<()> {
304 columns::execute(args)
305}
306
307fn required_output_path<'a>(output: Option<&'a Path>, message: &str) -> Result<&'a Path> {
308 output.ok_or_else(|| anyhow!(message.to_string()))
309}
310
311fn parse_columns(specs: &[String]) -> Result<Vec<ColumnMeta>> {
312 let mut columns = Vec::new();
313 let mut seen = HashSet::new();
314 let mut output_names = HashSet::new();
315
316 for raw in specs {
317 for token in raw.split(',') {
318 let token = token.trim();
319 if token.is_empty() {
320 continue;
321 }
322 let (name_part, type_part) = token.split_once(':').ok_or_else(|| {
323 anyhow!("Column definition '{token}' must use the form name:type")
324 })?;
325
326 let name = name_part.trim();
327 if name.is_empty() {
328 return Err(anyhow!(
329 "Column name cannot be empty in definition '{token}'"
330 ));
331 }
332 if !seen.insert(name.to_string()) {
333 return Err(anyhow!("Duplicate column name '{name}' provided"));
334 }
335
336 let (type_raw, rename_raw) = if let Some((ty, rename)) = type_part.split_once("->") {
337 (ty, Some(rename))
338 } else {
339 (type_part, None)
340 };
341
342 let column_type = ColumnType::from_str(type_raw.trim())
343 .map_err(|err| anyhow!("Column '{name}' has invalid type '{type_part}': {err}"))?;
344
345 let rename = rename_raw
346 .map(|value| value.trim())
347 .filter(|value| !value.is_empty())
348 .map(|value| value.to_string());
349
350 if let Some(ref alias) = rename {
351 if alias != name && seen.contains(alias) {
352 return Err(anyhow!(
353 "Output name '{alias}' conflicts with an existing column name"
354 ));
355 }
356 if !output_names.insert(alias.clone()) {
357 return Err(anyhow!("Duplicate output column name '{alias}' provided"));
358 }
359 }
360
361 if rename.is_none() {
362 output_names.insert(name.to_string());
363 }
364
365 columns.push(ColumnMeta {
366 name: name.to_string(),
367 datatype: column_type,
368 rename,
369 value_replacements: Vec::new(),
370 datatype_mappings: Vec::new(),
371 });
372 }
373 }
374
375 if columns.is_empty() {
376 return Err(anyhow!("At least one --column definition is required"));
377 }
378
379 Ok(columns)
380}
381
382fn apply_replacements(columns: &mut [ColumnMeta], specs: &[String]) -> Result<()> {
383 if specs.is_empty() {
384 return Ok(());
385 }
386 let mut lookup = HashSet::new();
387 for column in columns.iter() {
388 lookup.insert(column.name.clone());
389 }
390
391 for raw in specs {
392 let spec = raw.trim();
393 if spec.is_empty() {
394 continue;
395 }
396 let (column_name, mapping) = spec.split_once('=').ok_or_else(|| {
397 anyhow!("Replacement '{spec}' must use the form column=value->new_value")
398 })?;
399 let column_name = column_name.trim();
400 if column_name.is_empty() {
401 return Err(anyhow!("Replacement '{spec}' is missing a column name"));
402 }
403 if !lookup.contains(column_name) {
404 return Err(anyhow!(
405 "Replacement references unknown column '{column_name}'"
406 ));
407 }
408 let (from_raw, to_raw) = mapping.split_once("->").ok_or_else(|| {
409 anyhow!(
410 "Replacement '{spec}' must include '->' to separate original and replacement values"
411 )
412 })?;
413 let from = from_raw.trim().to_string();
414 let to = to_raw.trim().to_string();
415 let column = columns
416 .iter_mut()
417 .find(|c| c.name == column_name)
418 .expect("column should exist");
419 if let Some(existing) = column
420 .value_replacements
421 .iter()
422 .position(|r| r.from == from)
423 {
424 column.value_replacements.remove(existing);
425 }
426 column
427 .value_replacements
428 .push(ValueReplacement { from, to });
429 }
430
431 Ok(())
432}
433
434fn apply_overrides(schema: &mut Schema, overrides: &[String]) -> Result<HashSet<String>> {
435 if overrides.is_empty() {
436 return Ok(HashSet::new());
437 }
438
439 let mut seen = HashSet::new();
440 let mut applied = HashSet::new();
441 for raw in overrides {
442 let spec = raw.trim();
443 if spec.is_empty() {
444 continue;
445 }
446 let (name_part, type_part) = spec
447 .split_once(':')
448 .ok_or_else(|| anyhow!("Override '{spec}' must use the form name:type"))?;
449 let name = name_part.trim();
450 if name.is_empty() {
451 return Err(anyhow!("Override '{spec}' is missing a column name"));
452 }
453 if !seen.insert(name.to_string()) {
454 return Err(anyhow!("Duplicate override provided for column '{name}'"));
455 }
456
457 let override_type = ColumnType::from_str(type_part.trim()).with_context(|| {
458 format!("Override for column '{name}' has invalid type '{type_part}'")
459 })?;
460
461 let column = schema
462 .columns
463 .iter_mut()
464 .find(|col| col.name == name)
465 .ok_or_else(|| anyhow!("Override references unknown column '{name}'"))?;
466 column.datatype = override_type;
467 applied.insert(name.to_string());
468 }
469
470 Ok(applied)
471}
472
473fn apply_default_name_mappings(schema: &mut Schema) -> HashSet<String> {
474 let mut suggested = HashSet::new();
475 for column in &mut schema.columns {
476 if column.rename.is_none() {
477 let suggestion = to_lower_snake_case(&column.name);
478 column.rename = Some(suggestion);
479 suggested.insert(column.name.clone());
480 }
481 }
482 suggested
483}
484
485fn render_probe_report(
486 schema: &Schema,
487 stats: &InferenceStats,
488 overrides: &HashSet<String>,
489 requested_sample_rows: usize,
490 placeholder_policy: &schema::PlaceholderPolicy,
491 suggested_renames: Option<&HashSet<String>>,
492) -> String {
493 if schema.columns.is_empty() {
494 return "No columns inferred.\n".to_string();
495 }
496 let rows_read = stats.rows_read();
497 let headers = vec![
498 "#".to_string(),
499 "name".to_string(),
500 "type".to_string(),
501 "rename".to_string(),
502 "override".to_string(),
503 "sample".to_string(),
504 "format".to_string(),
505 "observations".to_string(),
506 ];
507 let mut rows = Vec::with_capacity(schema.columns.len());
508 for (idx, column) in schema.columns.iter().enumerate() {
509 let rename_display = column
510 .rename
511 .as_deref()
512 .filter(|value| !value.is_empty())
513 .map(|value| {
514 if suggested_renames.is_some_and(|set| set.contains(&column.name)) {
515 format!("{value} (suggested)")
516 } else {
517 value.to_string()
518 }
519 })
520 .unwrap_or_else(|| "—".to_string());
521 let mut status_flags = Vec::new();
522 if overrides.contains(&column.name) {
523 status_flags.push("type");
524 }
525 if column.rename.is_some() {
526 status_flags.push("mapping");
527 }
528 let status_display = if status_flags.is_empty() {
529 "—".to_string()
530 } else {
531 status_flags.join("+")
532 };
533 let sample_display = stats
534 .sample_value(idx)
535 .map(truncate_sample)
536 .unwrap_or_else(|| "—".to_string());
537 let format_display = schema::format_hint_for(&column.datatype, stats.sample_value(idx))
538 .unwrap_or_else(|| "—".to_string());
539 let observation_display = column_observation_summary(stats, idx, rows_read);
540 rows.push(vec![
541 (idx + 1).to_string(),
542 column.name.clone(),
543 column.datatype.to_string(),
544 rename_display,
545 status_display,
546 sample_display,
547 format_display,
548 observation_display,
549 ]);
550 }
551 let mut output = table::render_table(&headers, &rows);
552
553 if requested_sample_rows == 0 {
554 output.push_str(&format!("\nSampled {rows_read} row(s) (full scan).\n"));
555 } else if rows_read >= requested_sample_rows {
556 output.push_str(&format!(
557 "\nSampled {rows_read} row(s) (requested limit {requested_sample_rows}).\n"
558 ));
559 } else {
560 output.push_str(&format!(
561 "\nSampled {rows_read} row(s) out of requested {requested_sample_rows}.\n"
562 ));
563 }
564 if stats.decode_errors() > 0 {
565 output.push_str(&format!(
566 "Skipped {} value(s) due to decoding errors.\n",
567 stats.decode_errors()
568 ));
569 } else {
570 output.push_str("No decoding errors encountered.\n");
571 }
572
573 let signature = compute_schema_signature(schema);
574 output.push_str(&format!("Header+Type Hash: {signature}\n"));
575
576 if let Some(section) = render_placeholder_section(schema, stats, placeholder_policy) {
577 output.push_str(§ion);
578 }
579
580 output
581}
582
583fn truncate_sample(value: &str) -> String {
584 const LIMIT: usize = 32;
585 let mut result = String::new();
586 for (idx, ch) in value.chars().enumerate() {
587 if idx >= LIMIT {
588 result.push('…');
589 break;
590 }
591 result.push(ch);
592 }
593 result
594}
595
596fn summarize_histogram_value(value: &str) -> String {
597 let mut sanitized = String::with_capacity(value.len());
598 for ch in value.chars() {
599 match ch {
600 '\n' | '\r' | '\t' => sanitized.push(' '),
601 _ => sanitized.push(ch),
602 }
603 }
604 truncate_sample(&sanitized)
605}
606
607fn column_observation_summary(
608 stats: &InferenceStats,
609 column_index: usize,
610 rows_read: usize,
611) -> String {
612 let mut fragments = Vec::new();
613 if let Some(summary) = stats.summary(column_index) {
614 fragments.push(format!("non_empty={}", summary.non_empty));
615 let empty = rows_read.saturating_sub(summary.non_empty);
616 if rows_read > 0 && empty > 0 {
617 fragments.push(format!("empty={empty}"));
618 }
619 if !summary.tracked_values.is_empty() {
620 let histogram = summary
621 .tracked_values
622 .iter()
623 .take(3)
624 .map(|(value, count)| {
625 let display = summarize_histogram_value(value);
626 format!("{display} ({count})")
627 })
628 .collect::<Vec<_>>()
629 .join(", ");
630 fragments.push(format!("samples=[{histogram}]"));
631 if summary.tracked_values.len() > 3 {
632 fragments.push("samples+=...".to_string());
633 }
634 }
635 if summary.other_values > 0 {
636 fragments.push(format!("others={}", summary.other_values));
637 }
638 }
639
640 if let Some(placeholders) = stats.placeholder_summary(column_index) {
641 let entries = placeholders.entries();
642 if !entries.is_empty() {
643 let tokens = entries
644 .iter()
645 .take(3)
646 .map(|(token, count)| {
647 let display = truncate_sample(token);
648 format!("{display} ({count})")
649 })
650 .collect::<Vec<_>>()
651 .join(", ");
652 fragments.push(format!("placeholders=[{tokens}]"));
653 if entries.len() > 3 {
654 fragments.push("placeholders+=...".to_string());
655 }
656 }
657 }
658
659 if fragments.is_empty() {
660 "—".to_string()
661 } else {
662 fragments.join("; ")
663 }
664}
665
666fn render_placeholder_section(
667 schema: &Schema,
668 stats: &InferenceStats,
669 placeholder_policy: &schema::PlaceholderPolicy,
670) -> Option<String> {
671 let mut blocks = Vec::new();
672 for (idx, column) in schema.columns.iter().enumerate() {
673 let Some(summary) = stats.placeholder_summary(idx) else {
674 continue;
675 };
676 let entries = summary.entries();
677 if entries.is_empty() {
678 continue;
679 }
680 let mut block = String::new();
681 let type_note = if column.datatype != ColumnType::String {
682 " (non-string)"
683 } else {
684 ""
685 };
686 block.push_str(&format!(
687 " • {} ({}{})\n",
688 column.name, column.datatype, type_note
689 ));
690 let tokens = entries
691 .iter()
692 .map(|(token, count)| format!("{token} ({count})"))
693 .collect::<Vec<_>>()
694 .join(", ");
695 block.push_str(&format!(" tokens: {tokens}\n"));
696 block.push_str(" replacements:\n");
697 let target_display = match placeholder_policy {
698 schema::PlaceholderPolicy::TreatAsEmpty => "\"\"".to_string(),
699 schema::PlaceholderPolicy::FillWith(value) => format!("\"{value}\""),
700 };
701 for (token, _) in entries {
702 block.push_str(&format!(
703 " - from \"{token}\" -> to {target_display}\n"
704 ));
705 }
706 blocks.push(block);
707 }
708
709 if blocks.is_empty() {
710 return None;
711 }
712
713 let mut section = match placeholder_policy {
714 schema::PlaceholderPolicy::TreatAsEmpty => {
715 "\nPlaceholder Suggestions (replace with empty string):\n".to_string()
716 }
717 schema::PlaceholderPolicy::FillWith(value) => {
718 format!("\nPlaceholder Suggestions (replace with '{value}'):\n")
719 }
720 };
721 for block in blocks {
722 section.push_str(&block);
723 }
724 Some(section)
725}
726
727fn compute_schema_signature(schema: &Schema) -> String {
728 let mut hasher = Sha256::new();
729 for column in &schema.columns {
730 hasher.update(column.name.as_bytes());
731 hasher.update(b":");
732 hasher.update(column.datatype.signature_token().as_bytes());
733 hasher.update(b";");
734 }
735 format!("{:x}", hasher.finalize())
736}
737
738fn emit_mappings(schema: &Schema) {
739 if schema.columns.is_empty() {
740 println!("No columns found to emit mappings.");
741 return;
742 }
743 let mut rows = Vec::with_capacity(schema.columns.len());
744 for (idx, column) in schema.columns.iter().enumerate() {
745 let mapping = format!("{}:{}->", column.name, column.datatype.cli_token());
746 let suggested = column
747 .rename
748 .as_ref()
749 .filter(|value| !value.is_empty())
750 .cloned()
751 .unwrap_or_else(|| to_lower_snake_case(&column.name));
752 rows.push(vec![
753 (idx + 1).to_string(),
754 column.name.clone(),
755 column.datatype.to_string(),
756 mapping,
757 suggested,
758 ]);
759 }
760 let headers = vec![
761 "#".to_string(),
762 "name".to_string(),
763 "type".to_string(),
764 "mapping".to_string(),
765 "suggested".to_string(),
766 ];
767 table::print_table(&headers, &rows);
768}
769
770fn handle_snapshot(report: &str, snapshot_path: Option<&Path>) -> Result<()> {
771 let Some(path) = snapshot_path else {
772 return Ok(());
773 };
774
775 if path.exists() {
776 let expected =
777 fs::read_to_string(path).with_context(|| format!("Reading snapshot from {path:?}"))?;
778 if expected != report {
779 return Err(anyhow!(
780 "Probe output does not match snapshot at {path:?}. Inspect differences and update the snapshot if the change is intentional."
781 ));
782 }
783 } else {
784 if let Some(parent) = path.parent().filter(|p| !p.as_os_str().is_empty()) {
785 fs::create_dir_all(parent)
786 .with_context(|| format!("Creating snapshot directory {parent:?}"))?;
787 }
788 fs::write(path, report).with_context(|| format!("Writing snapshot to {path:?}"))?;
789 eprintln!("Snapshot captured at {path:?}");
790 }
791
792 Ok(())
793}
794
795fn to_lower_snake_case(value: &str) -> String {
796 let mut result = String::new();
797 let mut chars = value.chars().peekable();
798 let mut last_was_separator = true;
799 let mut last_was_upper = false;
800 while let Some(ch) = chars.next() {
801 if ch.is_ascii_alphanumeric() {
802 if ch.is_ascii_uppercase() {
803 let next_is_lowercase = chars
804 .peek()
805 .map(|c| c.is_ascii_lowercase())
806 .unwrap_or(false);
807 if !result.is_empty()
808 && (!last_was_separator && (!last_was_upper || next_is_lowercase))
809 && !result.ends_with('_')
810 {
811 result.push('_');
812 }
813 result.push(ch.to_ascii_lowercase());
814 last_was_separator = false;
815 last_was_upper = true;
816 } else {
817 if !result.is_empty() && last_was_separator && !result.ends_with('_') {
818 result.push('_');
819 }
820 result.push(ch.to_ascii_lowercase());
821 last_was_separator = false;
822 last_was_upper = false;
823 }
824 } else {
825 if !result.ends_with('_') && !result.is_empty() {
826 result.push('_');
827 }
828 last_was_separator = true;
829 last_was_upper = false;
830 }
831 }
832 while result.ends_with('_') {
833 result.pop();
834 }
835 if result.is_empty() {
836 value.to_ascii_lowercase()
837 } else {
838 result
839 }
840}
841
842#[cfg(test)]
843mod tests {
844 use super::*;
845
846 #[test]
847 fn parse_columns_accepts_comma_and_repeats() {
848 let specs = vec![
849 "id:integer,name:string".to_string(),
850 "amount:float".to_string(),
851 ];
852 let columns = parse_columns(&specs).expect("parsed");
853 assert_eq!(columns.len(), 3);
854 assert_eq!(columns[0].name, "id");
855 assert_eq!(columns[1].name, "name");
856 assert_eq!(columns[2].name, "amount");
857 assert_eq!(columns[0].datatype, ColumnType::Integer);
858 assert_eq!(columns[1].datatype, ColumnType::String);
859 assert_eq!(columns[2].datatype, ColumnType::Float);
860 }
861
862 #[test]
863 fn duplicate_columns_are_rejected() {
864 let specs = vec!["id:integer,id:string".to_string()];
865 let err = parse_columns(&specs).unwrap_err();
866 assert!(err.to_string().contains("Duplicate column name"));
867 }
868
869 #[test]
870 fn missing_type_is_rejected() {
871 let specs = vec!["id".to_string()];
872 let err = parse_columns(&specs).unwrap_err();
873 assert!(err.to_string().contains("must use the form"));
874 }
875
876 #[test]
877 fn parse_columns_supports_output_rename() {
878 let specs = vec!["id:integer->Identifier,name:string".to_string()];
879 let columns = parse_columns(&specs).expect("parsed");
880 assert_eq!(columns.len(), 2);
881 assert_eq!(columns[0].rename.as_deref(), Some("Identifier"));
882 assert!(columns[1].rename.is_none());
883 }
884
885 #[test]
886 fn duplicate_output_names_are_rejected() {
887 let specs = vec![
888 "id:integer->Identifier".to_string(),
889 "code:string->Identifier".to_string(),
890 ];
891 let err = parse_columns(&specs).unwrap_err();
892 assert!(err.to_string().contains("Duplicate output column name"));
893 }
894
895 #[test]
896 fn replacements_apply_to_columns() {
897 let specs = vec!["status:string".to_string()];
898 let mut columns = parse_columns(&specs).expect("parsed");
899 let replacements = vec!["status=pending->shipped".to_string()];
900 apply_replacements(&mut columns, &replacements).expect("applied");
901 assert_eq!(columns[0].value_replacements.len(), 1);
902 assert_eq!(columns[0].value_replacements[0].from, "pending");
903 assert_eq!(columns[0].value_replacements[0].to, "shipped");
904 }
905
906 #[test]
907 fn replacements_validate_column_names() {
908 let specs = vec!["status:string".to_string()];
909 let mut columns = parse_columns(&specs).expect("parsed");
910 let replacements = vec!["missing=pending->shipped".to_string()];
911 let err = apply_replacements(&mut columns, &replacements).unwrap_err();
912 assert!(err.to_string().contains("unknown column"));
913 }
914
915 #[test]
916 fn to_lower_snake_case_converts_names() {
917 assert_eq!(to_lower_snake_case("OrderDate"), "order_date");
918 assert_eq!(to_lower_snake_case("customer-name"), "customer_name");
919 assert_eq!(to_lower_snake_case("customer name"), "customer_name");
920 assert_eq!(to_lower_snake_case("APIKey"), "api_key");
921 assert_eq!(to_lower_snake_case("HTTPStatus"), "http_status");
922 }
923
924 #[test]
925 fn apply_overrides_updates_types() {
926 let mut schema = Schema {
927 columns: vec![ColumnMeta {
928 name: "amount".to_string(),
929 datatype: ColumnType::Float,
930 rename: None,
931 value_replacements: Vec::new(),
932 datatype_mappings: Vec::new(),
933 }],
934 schema_version: None,
935 has_headers: true,
936 };
937 let overrides = vec!["amount:integer".to_string(), "".to_string()];
938 let applied = apply_overrides(&mut schema, &overrides).unwrap();
939 assert_eq!(schema.columns[0].datatype, ColumnType::Integer);
940 assert!(applied.contains("amount"));
941 }
942
943 #[test]
944 fn apply_default_name_mappings_returns_suggested_set() {
945 let mut schema = Schema {
946 columns: vec![
947 ColumnMeta {
948 name: "OrderID".to_string(),
949 datatype: ColumnType::Integer,
950 rename: None,
951 value_replacements: Vec::new(),
952 datatype_mappings: Vec::new(),
953 },
954 ColumnMeta {
955 name: "CustomerName".to_string(),
956 datatype: ColumnType::String,
957 rename: Some("customer_name".to_string()),
958 value_replacements: Vec::new(),
959 datatype_mappings: Vec::new(),
960 },
961 ],
962 schema_version: None,
963 has_headers: true,
964 };
965
966 let suggested = apply_default_name_mappings(&mut schema);
967
968 assert_eq!(schema.columns[0].rename.as_deref(), Some("order_id"));
969 assert_eq!(schema.columns[1].rename.as_deref(), Some("customer_name"));
970 assert!(suggested.contains("OrderID"));
971 assert!(!suggested.contains("CustomerName"));
972 }
973}