1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, HashSet},
4 fmt,
5 fs::File,
6 io::BufReader,
7 path::Path,
8 str::FromStr,
9};
10
11use anyhow::{Context, Result, anyhow, bail, ensure};
12use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
13use encoding_rs::Encoding;
14use rust_decimal::Decimal;
15use rust_decimal::prelude::FromPrimitive;
16use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
17use serde_yaml::Value;
18use uuid::Uuid;
19
20use crate::{
21 data::{
22 CurrencyValue, FixedDecimalValue, Value as DataValue, parse_currency_decimal,
23 parse_decimal_literal, parse_naive_date, parse_naive_datetime, parse_naive_time,
24 parse_typed_value,
25 },
26 io_utils,
27};
28
29const DECIMAL_MAX_PRECISION: u32 = 28;
30const HEADER_ALIAS_THRESHOLD_PERCENT: usize = 80;
31const HEADER_ALIAS_MIN_MATCHES: usize = 4;
32const HEADER_DETECTION_SAMPLE_ROWS: usize = 6;
33
34const COMMON_HEADER_TOKENS: &[&str] = &[
35 "address",
36 "amount",
37 "category",
38 "city",
39 "code",
40 "country",
41 "created",
42 "currency",
43 "date",
44 "description",
45 "email",
46 "first_name",
47 "id",
48 "item",
49 "last_name",
50 "name",
51 "phone",
52 "price",
53 "quantity",
54 "state",
55 "status",
56 "total",
57 "type",
58 "updated",
59 "zip",
60];
61
62#[derive(Debug, Clone)]
63pub struct CsvLayout {
64 pub headers: Vec<String>,
65 pub has_headers: bool,
66}
67
68impl CsvLayout {
69 pub fn field_count(&self) -> usize {
70 self.headers.len()
71 }
72}
73
74#[derive(Debug, Clone, Default)]
75pub enum PlaceholderPolicy {
76 #[default]
77 TreatAsEmpty,
78 FillWith(String),
79}
80
81#[derive(Debug, Clone, Default)]
82pub struct PlaceholderSummary {
83 counts: BTreeMap<String, usize>,
84}
85
86impl PlaceholderSummary {
87 pub fn is_empty(&self) -> bool {
88 self.counts.is_empty()
89 }
90
91 pub fn record(&mut self, value: &str) {
92 let trimmed = value.trim();
93 if trimmed.is_empty() {
94 return;
95 }
96 *self.counts.entry(trimmed.to_string()).or_insert(0) += 1;
97 }
98
99 pub fn entries(&self) -> Vec<(String, usize)> {
100 self.counts
101 .iter()
102 .map(|(token, count)| (token.clone(), *count))
103 .collect()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
108pub struct DecimalSpec {
109 pub precision: u32,
110 pub scale: u32,
111}
112
113impl DecimalSpec {
114 pub fn new(precision: u32, scale: u32) -> Result<Self> {
115 let spec = Self { precision, scale };
116 spec.ensure_valid()?;
117 Ok(spec)
118 }
119
120 pub fn ensure_valid(&self) -> Result<()> {
121 ensure!(self.precision > 0, "Decimal precision must be positive");
122 ensure!(
123 self.precision <= DECIMAL_MAX_PRECISION,
124 "Decimal precision must be <= {}",
125 DECIMAL_MAX_PRECISION
126 );
127 ensure!(
128 self.scale <= self.precision,
129 "Decimal scale ({}) cannot exceed precision ({})",
130 self.scale,
131 self.precision
132 );
133 ensure!(
134 self.scale <= DECIMAL_MAX_PRECISION,
135 "Decimal scale must be <= {}",
136 DECIMAL_MAX_PRECISION
137 );
138 Ok(())
139 }
140
141 pub fn signature(&self) -> String {
142 format!("decimal({},{})", self.precision, self.scale)
143 }
144
145 pub fn describe(&self) -> String {
146 format!("decimal(precision={},scale={})", self.precision, self.scale)
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub enum ColumnType {
152 String,
153 Integer,
154 Float,
155 Boolean,
156 Date,
157 DateTime,
158 Time,
159 Guid,
160 Currency,
161 Decimal(DecimalSpec),
162}
163
164impl Serialize for ColumnType {
165 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
166 where
167 S: Serializer,
168 {
169 match self {
170 ColumnType::String => serializer.serialize_str("String"),
171 ColumnType::Integer => serializer.serialize_str("Integer"),
172 ColumnType::Float => serializer.serialize_str("Float"),
173 ColumnType::Boolean => serializer.serialize_str("Boolean"),
174 ColumnType::Date => serializer.serialize_str("Date"),
175 ColumnType::DateTime => serializer.serialize_str("DateTime"),
176 ColumnType::Time => serializer.serialize_str("Time"),
177 ColumnType::Guid => serializer.serialize_str("Guid"),
178 ColumnType::Currency => serializer.serialize_str("Currency"),
179 ColumnType::Decimal(spec) => serializer.serialize_str(&spec.signature()),
180 }
181 }
182}
183
184impl<'de> Deserialize<'de> for ColumnType {
185 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
186 where
187 D: Deserializer<'de>,
188 {
189 let human_readable = deserializer.is_human_readable();
190 #[cfg(test)]
191 {
192 if !human_readable && std::env::var("CSV_MANAGED_DEBUG_COLUMN_TYPE").is_ok() {
193 eprintln!("ColumnType binary deserialize invoked");
194 }
195 }
196 if human_readable {
197 let value = serde_yaml::Value::deserialize(deserializer)?;
198 parse_human_readable_column_type(value).map_err(de::Error::custom)
199 } else {
200 let token = String::deserialize(deserializer)?;
201 ColumnType::from_str(&token).map_err(|err| de::Error::custom(err.to_string()))
202 }
203 }
204}
205
206fn parse_decimal_from_mapping(value: serde_yaml::Value) -> Result<ColumnType> {
207 let mapping = value
208 .as_mapping()
209 .ok_or_else(|| anyhow!("Decimal mapping must be a map with precision/scale"))?;
210
211 let mut precision: Option<u32> = None;
212 let mut scale: Option<u32> = None;
213
214 for (key, val) in mapping {
215 let key_str = key
216 .as_str()
217 .ok_or_else(|| anyhow!("Decimal mapping keys must be strings"))?
218 .to_ascii_lowercase();
219
220 match key_str.as_str() {
221 "precision" => {
222 let parsed = val
223 .as_u64()
224 .ok_or_else(|| anyhow!("Decimal precision must be an unsigned integer"))?;
225 precision = Some(parsed as u32);
226 }
227 "scale" => {
228 let parsed = val
229 .as_u64()
230 .ok_or_else(|| anyhow!("Decimal scale must be an unsigned integer"))?;
231 scale = Some(parsed as u32);
232 }
233 other => {
234 return Err(anyhow!("Unknown decimal key '{other}'"));
235 }
236 }
237 }
238
239 let precision = precision.ok_or_else(|| anyhow!("Decimal mapping requires precision"))?;
240 let scale = scale.ok_or_else(|| anyhow!("Decimal mapping requires scale"))?;
241 let spec = DecimalSpec::new(precision, scale)?;
242 Ok(ColumnType::Decimal(spec))
243}
244
245fn parse_human_readable_column_type(value: serde_yaml::Value) -> Result<ColumnType> {
246 if let Some(token) = value.as_str() {
247 return ColumnType::from_str(token);
248 }
249
250 if let Some(mapping) = value.as_mapping()
251 && mapping.len() == 1
252 && let Some((key, val)) = mapping.iter().next()
253 {
254 let key_normalized = key
255 .as_str()
256 .ok_or_else(|| anyhow!("Structured datatype key must be a string"))?
257 .trim()
258 .to_ascii_lowercase();
259 return match key_normalized.as_str() {
260 "decimal" => parse_decimal_from_mapping(val.clone()),
261 other => Err(anyhow!("Unsupported structured datatype '{other}'")),
262 };
263 }
264
265 Err(anyhow!(
266 "Unsupported column datatype representation: {value:?}"
267 ))
268}
269
270impl ColumnType {
271 pub fn as_str(&self) -> &'static str {
272 match self {
273 ColumnType::String => "string",
274 ColumnType::Integer => "integer",
275 ColumnType::Float => "float",
276 ColumnType::Boolean => "boolean",
277 ColumnType::Date => "date",
278 ColumnType::DateTime => "datetime",
279 ColumnType::Time => "time",
280 ColumnType::Guid => "guid",
281 ColumnType::Currency => "currency",
282 ColumnType::Decimal(_) => "decimal",
283 }
284 }
285
286 pub fn variants() -> &'static [&'static str] {
287 &[
288 "string",
289 "integer",
290 "float",
291 "boolean",
292 "date",
293 "datetime",
294 "time",
295 "guid",
296 "currency",
297 "decimal(precision,scale)",
298 ]
299 }
300
301 pub fn describe(&self) -> String {
302 match self {
303 ColumnType::Decimal(spec) => spec.describe(),
304 _ => self.as_str().to_string(),
305 }
306 }
307
308 pub fn signature_token(&self) -> String {
309 match self {
310 ColumnType::Decimal(spec) => spec.signature(),
311 _ => self.as_str().to_string(),
312 }
313 }
314
315 pub fn cli_token(&self) -> String {
316 match self {
317 ColumnType::Decimal(spec) => format!("decimal({},{})", spec.precision, spec.scale),
318 _ => self.as_str().to_string(),
319 }
320 }
321
322 pub fn decimal_spec(&self) -> Option<&DecimalSpec> {
323 match self {
324 ColumnType::Decimal(spec) => Some(spec),
325 _ => None,
326 }
327 }
328}
329
330impl fmt::Display for ColumnType {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332 write!(f, "{}", self.describe())
333 }
334}
335
336impl std::str::FromStr for ColumnType {
337 type Err = anyhow::Error;
338
339 fn from_str(value: &str) -> Result<Self, Self::Err> {
340 let normalized = value.trim().to_ascii_lowercase();
341 match normalized.as_str() {
342 "string" => Ok(ColumnType::String),
343 "integer" | "int" => Ok(ColumnType::Integer),
344 "float" | "double" => Ok(ColumnType::Float),
345 "boolean" | "bool" => Ok(ColumnType::Boolean),
346 "date" => Ok(ColumnType::Date),
347 "datetime" | "date-time" | "timestamp" => Ok(ColumnType::DateTime),
348 "time" => Ok(ColumnType::Time),
349 "guid" | "uuid" => Ok(ColumnType::Guid),
350 "currency" => Ok(ColumnType::Currency),
351 other if other.starts_with("decimal") => parse_decimal_type(value),
352 _ => Err(anyhow!(
353 "Unknown column type '{value}'. Supported types: {}",
354 ColumnType::variants().join(", ")
355 )),
356 }
357 }
358}
359
360fn parse_decimal_type(value: &str) -> Result<ColumnType> {
361 let trimmed = value.trim();
362 let start = trimmed.find('(').ok_or_else(|| {
363 anyhow!("Decimal type must specify precision and scale, e.g. decimal(18,4)")
364 })?;
365 ensure!(
366 trimmed.ends_with(')'),
367 "Decimal type must close with ')', e.g. decimal(18,4)"
368 );
369 let inner = &trimmed[start + 1..trimmed.len() - 1];
370 let mut precision: Option<u32> = None;
371 let mut scale: Option<u32> = None;
372 let mut positional = Vec::new();
373
374 for part in inner.split(',') {
375 let token = part.trim();
376 if token.is_empty() {
377 continue;
378 }
379 if let Some((key, value)) = token
380 .split_once(['=', ':'])
381 .map(|(k, v)| (k.trim(), v.trim()))
382 {
383 let key_normalized = key.to_ascii_lowercase();
384 let parsed: u32 = value
385 .parse()
386 .with_context(|| format!("Parsing decimal {key}='{value}' in '{token}'"))?;
387 match key_normalized.as_str() {
388 "precision" => {
389 precision = Some(parsed);
390 }
391 "scale" => {
392 scale = Some(parsed);
393 }
394 other => {
395 bail!("Unknown decimal option '{other}' in '{token}'");
396 }
397 }
398 } else {
399 positional.push(token);
400 }
401 }
402
403 if let Some(first) = positional.first()
404 && precision.is_none()
405 {
406 precision =
407 Some(first.parse().with_context(|| {
408 format!("Parsing decimal precision from '{first}' in '{value}'")
409 })?);
410 }
411 if let Some(second) = positional.get(1)
412 && scale.is_none()
413 {
414 scale = Some(
415 second
416 .parse()
417 .with_context(|| format!("Parsing decimal scale from '{second}' in '{value}'"))?,
418 );
419 }
420 ensure!(
421 positional.len() <= 2,
422 "Decimal type accepts at most two positional arguments"
423 );
424
425 let precision = precision
426 .ok_or_else(|| anyhow!("Decimal type requires a precision value, e.g. decimal(18,4)"))?;
427 let scale =
428 scale.ok_or_else(|| anyhow!("Decimal type requires a scale value, e.g. decimal(18,4)"))?;
429
430 let spec = DecimalSpec::new(precision, scale)?;
431 Ok(ColumnType::Decimal(spec))
432}
433
434#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
435pub struct ValueReplacement {
436 pub from: String,
437 pub to: String,
438}
439
440#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
441pub struct DatatypeMapping {
442 pub from: ColumnType,
443 pub to: ColumnType,
444 #[serde(default, skip_serializing_if = "Option::is_none")]
445 pub strategy: Option<String>,
446 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
447 pub options: BTreeMap<String, Value>,
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
451pub struct ColumnMeta {
452 pub name: String,
453 pub datatype: ColumnType,
454 #[serde(
455 default,
456 skip_serializing_if = "Option::is_none",
457 rename = "name_mapping"
458 )]
459 pub rename: Option<String>,
460 #[serde(
461 default,
462 rename = "replace",
463 alias = "value_replacements",
464 skip_serializing_if = "Vec::is_empty"
465 )]
466 pub value_replacements: Vec<ValueReplacement>,
467 #[serde(
468 default,
469 rename = "datatype_mappings",
470 skip_serializing_if = "Vec::is_empty"
471 )]
472 pub datatype_mappings: Vec<DatatypeMapping>,
473}
474
475#[derive(Debug, Clone, Serialize, Deserialize)]
476pub struct Schema {
477 pub columns: Vec<ColumnMeta>,
478 #[serde(default, skip_serializing_if = "Option::is_none")]
479 pub schema_version: Option<String>,
480 #[serde(default = "Schema::default_has_headers")]
481 pub has_headers: bool,
482}
483
484#[derive(Debug, Clone)]
485pub struct ColumnSummary {
486 pub non_empty: usize,
487 pub tracked_values: Vec<(String, usize)>,
488 pub other_values: usize,
489}
490
491#[derive(Debug, Clone)]
492pub struct InferenceStats {
493 sample_values: Vec<Option<String>>,
494 rows_read: usize,
495 requested_rows: usize,
496 decode_errors: usize,
497 summaries: Vec<ColumnSummary>,
498 placeholder_summaries: Vec<PlaceholderSummary>,
499}
500
501impl InferenceStats {
502 pub fn sample_value(&self, index: usize) -> Option<&str> {
503 self.sample_values
504 .get(index)
505 .and_then(|value| value.as_deref())
506 }
507
508 pub fn summary(&self, index: usize) -> Option<&ColumnSummary> {
509 self.summaries.get(index)
510 }
511
512 pub fn rows_read(&self) -> usize {
513 self.rows_read
514 }
515
516 pub fn requested_rows(&self) -> usize {
517 self.requested_rows
518 }
519
520 pub fn decode_errors(&self) -> usize {
521 self.decode_errors
522 }
523
524 pub fn placeholder_summary(&self, index: usize) -> Option<&PlaceholderSummary> {
525 self.placeholder_summaries.get(index)
526 }
527}
528
529impl Schema {
530 pub fn from_headers(headers: &[String]) -> Self {
531 let columns = headers
532 .iter()
533 .map(|name| ColumnMeta {
534 name: name.clone(),
535 datatype: ColumnType::String,
536 rename: None,
537 value_replacements: Vec::new(),
538 datatype_mappings: Vec::new(),
539 })
540 .collect();
541 Schema {
542 columns,
543 schema_version: None,
544 has_headers: true,
545 }
546 }
547
548 pub const fn default_has_headers() -> bool {
549 true
550 }
551
552 pub fn expects_headers(&self) -> bool {
553 self.has_headers
554 }
555
556 pub fn column_index(&self, name: &str) -> Option<usize> {
557 self.columns
558 .iter()
559 .position(|c| c.name == name || c.rename.as_deref() == Some(name))
560 }
561
562 pub fn headers(&self) -> Vec<String> {
563 self.columns.iter().map(|c| c.name.clone()).collect()
564 }
565
566 pub fn output_headers(&self) -> Vec<String> {
567 self.columns
568 .iter()
569 .map(|c| c.output_name().to_string())
570 .collect()
571 }
572
573 pub(crate) fn header_alias_sets(&self) -> Vec<HashSet<String>> {
574 self.columns
575 .iter()
576 .map(|column| build_header_aliases(&column.name))
577 .collect()
578 }
579
580 pub fn validate_headers(&self, headers: &[String]) -> Result<()> {
581 if !self.has_headers {
582 return Ok(());
583 }
584 if headers.len() != self.columns.len() {
585 return Err(anyhow!(
586 "Header length mismatch: schema expects {} column(s) but file contains {}",
587 self.columns.len(),
588 headers.len()
589 ));
590 }
591 for (idx, column) in self.columns.iter().enumerate() {
592 let name = headers.get(idx).map(|s| s.as_str()).unwrap_or_default();
593 if column.matches_header(name) {
594 continue;
595 }
596 if let Some(mapped) = column
597 .rename
598 .as_deref()
599 .filter(|value| !value.is_empty() && *value != column.name)
600 {
601 return Err(anyhow!(
602 "Header mismatch at position {}: expected '{}' (or mapped '{}') but found '{}'",
603 idx + 1,
604 column.name,
605 mapped,
606 name
607 ));
608 }
609 return Err(anyhow!(
610 "Header mismatch at position {}: expected '{}' but found '{}'",
611 idx + 1,
612 column.name,
613 name
614 ));
615 }
616 Ok(())
617 }
618
619 pub fn save(&self, path: &Path) -> Result<()> {
620 self.save_internal(path, false)
621 }
622
623 pub fn save_with_replace_template(&self, path: &Path) -> Result<()> {
624 self.save_internal(path, true)
625 }
626
627 pub fn to_yaml_string(&self, include_replace_template: bool) -> Result<String> {
628 let value = self.to_yaml_value(include_replace_template)?;
629 serde_yaml::to_string(&value).context("Serializing schema to YAML string")
630 }
631
632 pub fn load(path: &Path) -> Result<Self> {
633 let file = File::open(path).with_context(|| format!("Opening schema file {path:?}"))?;
634 let reader = BufReader::new(file);
635 let schema: Schema = serde_yaml::from_reader(reader).context("Parsing schema YAML")?;
636 schema.validate_datatype_mappings()?;
637 Ok(schema)
638 }
639
640 fn save_internal(&self, path: &Path, include_replace_template: bool) -> Result<()> {
641 let value = self.to_yaml_value(include_replace_template)?;
642 let file = File::create(path).with_context(|| format!("Creating schema file {path:?}"))?;
643 serde_yaml::to_writer(file, &value).context("Writing schema YAML")
644 }
645
646 fn to_yaml_value(&self, include_replace_template: bool) -> Result<Value> {
647 let mut schema = self.clone();
648 if schema.schema_version.is_none() {
649 schema.schema_version = Some(CURRENT_SCHEMA_VERSION.to_string());
650 }
651 schema.validate_datatype_mappings()?;
652
653 let mut value =
654 serde_yaml::to_value(&schema).context("Serializing schema to YAML value")?;
655 if include_replace_template
656 && let Some(columns) = value
657 .get_mut("columns")
658 .and_then(|columns| columns.as_sequence_mut())
659 {
660 for column in columns {
661 if let Some(obj) = column.as_mapping_mut() {
662 if let Some(existing) = obj.remove(Value::from("value_replacements")) {
663 obj.insert(Value::from("replace"), existing);
664 }
665 let replace_key = Value::from("replace");
666 if !obj.contains_key(&replace_key) {
667 obj.insert(replace_key, Value::Sequence(Vec::new()));
668 }
669 }
670 }
671 }
672 Ok(value)
673 }
674}
675
676fn parse_initial_value(raw: &str, mapping: &DatatypeMapping) -> Result<DataValue> {
677 match mapping.from {
678 ColumnType::String => Ok(DataValue::String(raw.to_string())),
679 _ => parse_with_type(raw, &mapping.from),
680 }
681}
682
683fn parse_with_type(value: &str, ty: &ColumnType) -> Result<DataValue> {
684 let trimmed = value.trim();
685 parse_typed_value(trimmed, ty)
686 .with_context(|| format!("Parsing '{trimmed}' as {ty}"))?
687 .ok_or_else(|| anyhow!("Value is empty after trimming"))
688}
689
690fn value_column_type(value: &DataValue) -> ColumnType {
691 match value {
692 DataValue::String(_) => ColumnType::String,
693 DataValue::Integer(_) => ColumnType::Integer,
694 DataValue::Float(_) => ColumnType::Float,
695 DataValue::Boolean(_) => ColumnType::Boolean,
696 DataValue::Date(_) => ColumnType::Date,
697 DataValue::DateTime(_) => ColumnType::DateTime,
698 DataValue::Time(_) => ColumnType::Time,
699 DataValue::Guid(_) => ColumnType::Guid,
700 DataValue::Decimal(value) => ColumnType::Decimal(
701 DecimalSpec::new(value.precision(), value.scale())
702 .expect("FixedDecimalValue guarantees valid decimal spec"),
703 ),
704 DataValue::Currency(_) => ColumnType::Currency,
705 }
706}
707
708fn apply_single_mapping(mapping: &DatatypeMapping, value: DataValue) -> Result<DataValue> {
709 let strategy = normalized_strategy(mapping);
710 match (&mapping.to, value) {
711 (ColumnType::String, DataValue::String(mut s)) => {
712 if let Some(strategy) = strategy.as_deref() {
713 match strategy {
714 "trim" => s = s.trim().to_string(),
715 "lowercase" => s = s.to_ascii_lowercase(),
716 "uppercase" => s = s.to_ascii_uppercase(),
717 other => {
718 bail!("Strategy '{other}' is not valid for string -> string mappings");
719 }
720 }
721 }
722 Ok(DataValue::String(s))
723 }
724 (ColumnType::String, DataValue::Integer(i)) => Ok(DataValue::String(i.to_string())),
725 (ColumnType::String, DataValue::Float(f)) => {
726 let scale = resolve_scale(mapping);
727 let formatted =
728 if strategy.as_deref() == Some("round") || mapping.from == ColumnType::Float {
729 format_float_with_scale(f, scale)
730 } else {
731 f.to_string()
732 };
733 Ok(DataValue::String(formatted))
734 }
735 (ColumnType::String, DataValue::Boolean(b)) => Ok(DataValue::String(b.to_string())),
736 (ColumnType::String, DataValue::Date(d)) => {
737 let fmt = mapping
738 .options
739 .get("format")
740 .and_then(|v| v.as_str())
741 .unwrap_or("%Y-%m-%d");
742 Ok(DataValue::String(d.format(fmt).to_string()))
743 }
744 (ColumnType::String, DataValue::DateTime(dt)) => {
745 let fmt = mapping
746 .options
747 .get("format")
748 .and_then(|v| v.as_str())
749 .unwrap_or("%Y-%m-%d %H:%M:%S");
750 Ok(DataValue::String(dt.format(fmt).to_string()))
751 }
752 (ColumnType::String, DataValue::Time(t)) => {
753 let fmt = mapping
754 .options
755 .get("format")
756 .and_then(|v| v.as_str())
757 .unwrap_or("%H:%M:%S");
758 Ok(DataValue::String(t.format(fmt).to_string()))
759 }
760 (ColumnType::String, DataValue::Guid(g)) => Ok(DataValue::String(g.to_string())),
761 (ColumnType::String, DataValue::Decimal(d)) => Ok(DataValue::String(d.to_string_fixed())),
762 (ColumnType::String, DataValue::Currency(c)) => Ok(DataValue::String(c.to_string_fixed())),
763 (ColumnType::Integer, DataValue::String(s)) => {
764 let parsed = parse_with_type(&s, &ColumnType::Integer)?;
765 if let DataValue::Integer(i) = parsed {
766 Ok(DataValue::Integer(i))
767 } else {
768 unreachable!()
769 }
770 }
771 (ColumnType::Float, DataValue::String(s)) => {
772 let parsed = parse_with_type(&s, &ColumnType::Float)?;
773 let mut value = match parsed {
774 DataValue::Float(f) => f,
775 _ => unreachable!(),
776 };
777 if should_round_float(mapping, strategy.as_deref()) {
778 value = round_float(value, resolve_scale(mapping));
779 }
780 Ok(DataValue::Float(value))
781 }
782 (ColumnType::Currency, DataValue::String(s)) => {
783 let decimal = parse_currency_decimal(&s)?;
784 let scale = explicit_currency_scale(mapping)?
785 .unwrap_or_else(|| default_currency_scale(&decimal));
786 let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
787 Ok(DataValue::Currency(currency))
788 }
789 (ColumnType::Decimal(spec), DataValue::String(s)) => {
790 let decimal = parse_decimal_literal(&s)?;
791 let fixed = FixedDecimalValue::from_decimal(decimal, spec, strategy.as_deref())?;
792 Ok(DataValue::Decimal(fixed))
793 }
794 (ColumnType::Boolean, DataValue::String(s)) => {
795 let parsed = parse_with_type(&s, &ColumnType::Boolean)?;
796 if let DataValue::Boolean(b) = parsed {
797 Ok(DataValue::Boolean(b))
798 } else {
799 unreachable!()
800 }
801 }
802 (ColumnType::Date, DataValue::String(s)) => {
803 let parsed = parse_string_to_date(&s, mapping)?;
804 Ok(DataValue::Date(parsed))
805 }
806 (ColumnType::DateTime, DataValue::String(s)) => {
807 let parsed = parse_string_to_datetime(&s, mapping)?;
808 Ok(DataValue::DateTime(parsed))
809 }
810 (ColumnType::Time, DataValue::String(s)) => {
811 let parsed = parse_string_to_time(&s, mapping)?;
812 Ok(DataValue::Time(parsed))
813 }
814 (ColumnType::Guid, DataValue::String(s)) => {
815 let parsed = parse_with_type(&s, &ColumnType::Guid)?;
816 if let DataValue::Guid(g) = parsed {
817 Ok(DataValue::Guid(g))
818 } else {
819 unreachable!()
820 }
821 }
822 (ColumnType::Date, DataValue::DateTime(dt)) => Ok(DataValue::Date(dt.date())),
823 (ColumnType::Time, DataValue::DateTime(dt)) => Ok(DataValue::Time(dt.time())),
824 (ColumnType::Float, DataValue::Integer(i)) => {
825 let mut value = i as f64;
826 if should_round_float(mapping, strategy.as_deref()) {
827 value = round_float(value, resolve_scale(mapping));
828 }
829 Ok(DataValue::Float(value))
830 }
831 (ColumnType::Currency, DataValue::Integer(i)) => {
832 let decimal = Decimal::from(i);
833 let scale = explicit_currency_scale(mapping)?
834 .unwrap_or_else(|| default_currency_scale(&decimal));
835 let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
836 Ok(DataValue::Currency(currency))
837 }
838 (ColumnType::Decimal(spec), DataValue::Integer(i)) => {
839 let decimal = Decimal::from(i);
840 let fixed = FixedDecimalValue::from_decimal(decimal, spec, strategy.as_deref())?;
841 Ok(DataValue::Decimal(fixed))
842 }
843 (ColumnType::Integer, DataValue::Float(f)) => {
844 let rounded = match strategy.as_deref() {
845 Some("truncate") => f.trunc() as i64,
846 _ => f.round() as i64,
847 };
848 Ok(DataValue::Integer(rounded))
849 }
850 (ColumnType::Float, DataValue::Float(f)) => {
851 let mut value = f;
852 if should_round_float(mapping, strategy.as_deref()) {
853 value = round_float(value, resolve_scale(mapping));
854 }
855 Ok(DataValue::Float(value))
856 }
857 (ColumnType::Currency, DataValue::Float(f)) => {
858 let decimal = Decimal::from_f64(f)
859 .ok_or_else(|| anyhow!("Failed to convert float {f} to decimal"))?;
860 let scale = explicit_currency_scale(mapping)?
861 .unwrap_or_else(|| default_currency_scale(&decimal));
862 let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
863 Ok(DataValue::Currency(currency))
864 }
865 (ColumnType::Decimal(spec), DataValue::Float(f)) => {
866 let decimal = Decimal::from_f64(f)
867 .ok_or_else(|| anyhow!("Failed to convert float {f} to decimal"))?;
868 let fixed = FixedDecimalValue::from_decimal(decimal, spec, strategy.as_deref())?;
869 Ok(DataValue::Decimal(fixed))
870 }
871 (ColumnType::Float, DataValue::Currency(c)) => {
872 let value = c
873 .to_f64()
874 .ok_or_else(|| anyhow!("Currency value out of f64 range"))?;
875 Ok(DataValue::Float(value))
876 }
877 (ColumnType::Integer, DataValue::Currency(c)) => {
878 let f = c
879 .to_f64()
880 .ok_or_else(|| anyhow!("Currency value out of range for integer conversion"))?;
881 let rounded = match strategy.as_deref() {
882 Some("truncate") => f.trunc() as i64,
883 _ => f.round() as i64,
884 };
885 Ok(DataValue::Integer(rounded))
886 }
887 (ColumnType::Currency, DataValue::Currency(c)) => {
888 let decimal = *c.amount();
889 let scale = explicit_currency_scale(mapping)?
890 .unwrap_or_else(|| default_currency_scale(&decimal));
891 let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
892 Ok(DataValue::Currency(currency))
893 }
894 (ColumnType::Decimal(spec), DataValue::Currency(c)) => {
895 let fixed = FixedDecimalValue::from_decimal(*c.amount(), spec, strategy.as_deref())?;
896 Ok(DataValue::Decimal(fixed))
897 }
898 (ColumnType::Float, DataValue::Decimal(d)) => {
899 let value = d
900 .to_f64()
901 .ok_or_else(|| anyhow!("Decimal value out of f64 range"))?;
902 Ok(DataValue::Float(value))
903 }
904 (ColumnType::Integer, DataValue::Decimal(d)) => {
905 let value = d
906 .to_f64()
907 .ok_or_else(|| anyhow!("Decimal value out of range for integer conversion"))?;
908 let rounded = match strategy.as_deref() {
909 Some("truncate") => value.trunc() as i64,
910 _ => value.round() as i64,
911 };
912 Ok(DataValue::Integer(rounded))
913 }
914 (ColumnType::Currency, DataValue::Decimal(d)) => {
915 let decimal = *d.amount();
916 let scale = explicit_currency_scale(mapping)?
917 .unwrap_or_else(|| default_currency_scale(&decimal));
918 let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
919 Ok(DataValue::Currency(currency))
920 }
921 (ColumnType::Decimal(spec), DataValue::Decimal(existing)) => {
922 if existing.precision() == spec.precision && existing.scale() == spec.scale {
923 Ok(DataValue::Decimal(existing))
924 } else {
925 let fixed =
926 FixedDecimalValue::from_decimal(*existing.amount(), spec, strategy.as_deref())?;
927 Ok(DataValue::Decimal(fixed))
928 }
929 }
930 (ColumnType::Integer, DataValue::Integer(i)) => Ok(DataValue::Integer(i)),
931 _ => bail!(
932 "Datatype mapping '{}' -> '{}' is not supported",
933 mapping.from,
934 mapping.to
935 ),
936 }
937}
938
939fn render_mapped_value(value: &DataValue, mapping: &DatatypeMapping) -> Result<String> {
940 match (&mapping.to, value) {
941 (ColumnType::String, DataValue::String(s)) => Ok(s.clone()),
942 (ColumnType::Integer, DataValue::Integer(i)) => Ok(i.to_string()),
943 (ColumnType::Float, DataValue::Float(f)) => {
944 let scale = resolve_scale(mapping);
945 Ok(format_float_with_scale(*f, scale))
946 }
947 (ColumnType::Boolean, DataValue::Boolean(b)) => Ok(b.to_string()),
948 (ColumnType::Date, DataValue::Date(d)) => {
949 let fmt = mapping
950 .options
951 .get("format")
952 .and_then(|v| v.as_str())
953 .unwrap_or("%Y-%m-%d");
954 Ok(d.format(fmt).to_string())
955 }
956 (ColumnType::DateTime, DataValue::DateTime(dt)) => {
957 let fmt = mapping
958 .options
959 .get("format")
960 .and_then(|v| v.as_str())
961 .unwrap_or("%Y-%m-%d %H:%M:%S");
962 Ok(dt.format(fmt).to_string())
963 }
964 (ColumnType::Time, DataValue::Time(t)) => {
965 let fmt = mapping
966 .options
967 .get("format")
968 .and_then(|v| v.as_str())
969 .unwrap_or("%H:%M:%S");
970 Ok(t.format(fmt).to_string())
971 }
972 (ColumnType::Guid, DataValue::Guid(g)) => Ok(g.to_string()),
973 (ColumnType::Currency, DataValue::Currency(c)) => Ok(c.to_string_fixed()),
974 (ColumnType::Decimal(spec), DataValue::Decimal(d)) => {
975 if d.scale() == spec.scale && d.precision() == spec.precision {
976 Ok(d.to_string_fixed())
977 } else {
978 let fixed = FixedDecimalValue::from_decimal(*d.amount(), spec, None)?;
979 Ok(fixed.to_string_fixed())
980 }
981 }
982 _ => bail!(
983 "Mapping output type '{:?}' is incompatible with computed value '{:?}'",
984 mapping.to,
985 value_column_type(value)
986 ),
987 }
988}
989
990fn format_float_with_scale(value: f64, scale: usize) -> String {
991 if scale == 0 {
992 format!("{value:.0}")
993 } else {
994 format!("{:.precision$}", value, precision = scale)
995 }
996}
997
998fn should_round_float(mapping: &DatatypeMapping, strategy: Option<&str>) -> bool {
999 match strategy {
1000 Some("round") => true,
1001 Some(_) => false,
1002 None => mapping.from == ColumnType::Float && mapping.to == ColumnType::Float,
1003 }
1004}
1005
1006fn round_float(value: f64, scale: usize) -> f64 {
1007 if scale == 0 {
1008 value.round()
1009 } else {
1010 let factor = 10f64.powi(scale as i32);
1011 (value * factor).round() / factor
1012 }
1013}
1014
1015fn resolve_scale(mapping: &DatatypeMapping) -> usize {
1016 mapping
1017 .options
1018 .get("scale")
1019 .and_then(|value| {
1020 value
1021 .as_u64()
1022 .map(|u| u as usize)
1023 .or_else(|| value.as_i64().map(|i| i.max(0) as usize))
1024 })
1025 .unwrap_or(4)
1026}
1027
1028fn explicit_currency_scale(mapping: &DatatypeMapping) -> Result<Option<u32>> {
1029 if let Some(scale) = mapping.options.get("scale") {
1030 let numeric = if let Some(value) = scale.as_u64() {
1031 value
1032 } else if let Some(value) = scale.as_i64() {
1033 ensure!(value >= 0, "Currency scale must be non-negative");
1034 value as u64
1035 } else {
1036 bail!("Currency scale must be numeric");
1037 };
1038 let scale_u32 = numeric as u32;
1039 ensure!(
1040 crate::data::CURRENCY_ALLOWED_SCALES.contains(&scale_u32),
1041 "Currency scale must be 2 or 4"
1042 );
1043 Ok(Some(scale_u32))
1044 } else {
1045 Ok(None)
1046 }
1047}
1048
1049fn default_currency_scale(decimal: &Decimal) -> u32 {
1050 let scale = decimal.scale();
1051 if scale == 0 {
1052 2
1053 } else if crate::data::CURRENCY_ALLOWED_SCALES.contains(&scale) {
1054 scale
1055 } else if scale > 4 {
1056 4
1057 } else {
1058 2
1059 }
1060}
1061
1062fn parse_string_to_date(value: &str, mapping: &DatatypeMapping) -> Result<NaiveDate> {
1063 let trimmed = value.trim();
1064 if let Some(fmt) = mapping.options.get("format").and_then(|v| v.as_str()) {
1065 NaiveDate::parse_from_str(trimmed, fmt)
1066 .with_context(|| format!("Parsing '{trimmed}' with format '{fmt}'"))
1067 } else {
1068 parse_naive_date(trimmed)
1069 }
1070}
1071
1072fn parse_string_to_datetime(value: &str, mapping: &DatatypeMapping) -> Result<NaiveDateTime> {
1073 let trimmed = value.trim();
1074 if let Some(fmt) = mapping.options.get("format").and_then(|v| v.as_str()) {
1075 NaiveDateTime::parse_from_str(trimmed, fmt)
1076 .with_context(|| format!("Parsing '{trimmed}' with format '{fmt}'"))
1077 } else {
1078 parse_naive_datetime(trimmed)
1079 }
1080}
1081
1082fn parse_string_to_time(value: &str, mapping: &DatatypeMapping) -> Result<NaiveTime> {
1083 let trimmed = value.trim();
1084 if let Some(fmt) = mapping.options.get("format").and_then(|v| v.as_str()) {
1085 NaiveTime::parse_from_str(trimmed, fmt)
1086 .with_context(|| format!("Parsing '{trimmed}' with format '{fmt}'"))
1087 } else {
1088 parse_naive_time(trimmed)
1089 }
1090}
1091
1092fn normalized_strategy(mapping: &DatatypeMapping) -> Option<String> {
1093 mapping
1094 .strategy
1095 .as_ref()
1096 .map(|s| s.trim().to_ascii_lowercase())
1097 .filter(|s| !s.is_empty())
1098}
1099
1100fn validate_mapping_options(column_name: &str, mapping: &DatatypeMapping) -> Result<()> {
1101 if let Some(strategy_raw) = mapping.strategy.as_ref() {
1102 let strategy = strategy_raw.trim();
1103 if !strategy.is_empty() {
1104 let normalized = strategy.to_ascii_lowercase();
1105 match normalized.as_str() {
1106 "round" | "trim" | "lowercase" | "uppercase" | "truncate" => {}
1107 other => {
1108 bail!(
1109 "Column '{}' mapping {} -> {} uses unsupported strategy '{}'",
1110 column_name,
1111 mapping.from,
1112 mapping.to,
1113 other
1114 );
1115 }
1116 }
1117 if matches!(normalized.as_str(), "trim" | "lowercase" | "uppercase") {
1118 ensure!(
1119 mapping.from == ColumnType::String && mapping.to == ColumnType::String,
1120 "Column '{}' mapping {} -> {} cannot apply '{}' strategy",
1121 column_name,
1122 mapping.from,
1123 mapping.to,
1124 strategy
1125 );
1126 }
1127 if normalized == "round" {
1128 ensure!(
1129 matches!(
1130 mapping.to,
1131 ColumnType::Float
1132 | ColumnType::Integer
1133 | ColumnType::String
1134 | ColumnType::Currency
1135 | ColumnType::Decimal(_)
1136 ),
1137 "Column '{}' mapping {} -> {} cannot apply 'round' strategy",
1138 column_name,
1139 mapping.from,
1140 mapping.to
1141 );
1142 }
1143 if normalized == "truncate" {
1144 ensure!(
1145 matches!(
1146 mapping.to,
1147 ColumnType::Integer | ColumnType::Currency | ColumnType::Decimal(_)
1148 ),
1149 "Column '{}' mapping {} -> {} cannot apply 'truncate' strategy",
1150 column_name,
1151 mapping.from,
1152 mapping.to
1153 );
1154 }
1155 }
1156 }
1157
1158 if let Some(scale) = mapping.options.get("scale") {
1159 let numeric = if let Some(value) = scale.as_u64() {
1160 value
1161 } else if let Some(value) = scale.as_i64() {
1162 ensure!(
1163 value >= 0,
1164 "Column '{}' mapping {} -> {} requires a non-negative scale",
1165 column_name,
1166 mapping.from,
1167 mapping.to
1168 );
1169 value as u64
1170 } else {
1171 bail!(
1172 "Column '{}' mapping {} -> {} requires 'scale' to be a number",
1173 column_name,
1174 mapping.from,
1175 mapping.to
1176 );
1177 };
1178
1179 if mapping.to == ColumnType::Currency {
1180 ensure!(
1181 crate::data::CURRENCY_ALLOWED_SCALES.contains(&(numeric as u32)),
1182 "Column '{}' mapping {} -> {} requires scale to be 2 or 4",
1183 column_name,
1184 mapping.from,
1185 mapping.to
1186 );
1187 }
1188 if matches!(mapping.to, ColumnType::Decimal(_)) {
1189 bail!(
1190 "Column '{}' mapping {} -> {} should define scale via the decimal datatype rather than a mapping option",
1191 column_name,
1192 mapping.from,
1193 mapping.to
1194 );
1195 }
1196 }
1197
1198 if let Some(format_value) = mapping.options.get("format") {
1199 ensure!(
1200 format_value.as_str().is_some(),
1201 "Column '{}' mapping {} -> {} requires 'format' to be a string",
1202 column_name,
1203 mapping.from,
1204 mapping.to
1205 );
1206 }
1207
1208 if mapping.options.contains_key("precision") {
1209 bail!(
1210 "Column '{}' mapping {} -> {} should define precision via the decimal datatype rather than a mapping option",
1211 column_name,
1212 mapping.from,
1213 mapping.to
1214 );
1215 }
1216
1217 Ok(())
1218}
1219
1220#[derive(Debug, Clone)]
1221struct TypeCandidate {
1222 non_empty: usize,
1223 boolean_matches: usize,
1224 integer_matches: usize,
1225 integer_max_digits: u32,
1226 float_matches: usize,
1227 decimal_matches: usize,
1228 decimal_max_precision: u32,
1229 decimal_max_scale: u32,
1230 decimal_max_integer_digits: u32,
1231 decimal_precision_overflow: bool,
1232 date_matches: usize,
1233 datetime_matches: usize,
1234 time_matches: usize,
1235 guid_matches: usize,
1236 currency_matches: usize,
1237 currency_symbol_hits: usize,
1238 unclassified: usize,
1239}
1240
1241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1242enum NumericKind {
1243 Integer,
1244 Decimal,
1245 Float,
1246}
1247
1248#[derive(Debug, Clone, Copy)]
1249struct NumericObservation {
1250 kind: NumericKind,
1251 precision: u32,
1252 scale: u32,
1253 integer_digits: u32,
1254 has_currency_symbol: bool,
1255 fits_currency_scale: bool,
1256 overflow: bool,
1257}
1258
1259impl NumericObservation {
1260 fn integer(integer_digits: u32, has_currency_symbol: bool) -> Self {
1261 Self {
1262 kind: NumericKind::Integer,
1263 precision: integer_digits,
1264 scale: 0,
1265 integer_digits,
1266 has_currency_symbol,
1267 fits_currency_scale: true,
1268 overflow: false,
1269 }
1270 }
1271
1272 fn decimal(
1273 precision: u32,
1274 scale: u32,
1275 integer_digits: u32,
1276 has_currency_symbol: bool,
1277 fits_currency_scale: bool,
1278 overflow: bool,
1279 ) -> Self {
1280 Self {
1281 kind: NumericKind::Decimal,
1282 precision,
1283 scale,
1284 integer_digits,
1285 has_currency_symbol,
1286 fits_currency_scale,
1287 overflow,
1288 }
1289 }
1290
1291 fn float(has_currency_symbol: bool) -> Self {
1292 Self {
1293 kind: NumericKind::Float,
1294 precision: 0,
1295 scale: 0,
1296 integer_digits: 0,
1297 has_currency_symbol,
1298 fits_currency_scale: false,
1299 overflow: false,
1300 }
1301 }
1302}
1303
1304fn analyze_numeric_token(value: &str) -> Option<NumericObservation> {
1305 let trimmed = value.trim();
1306 if trimmed.is_empty() {
1307 return None;
1308 }
1309
1310 let mut body = trimmed;
1311 let mut had_parentheses = false;
1312 if body.starts_with('(') && body.ends_with(')') && body.len() > 2 {
1313 had_parentheses = true;
1314 body = &body[1..body.len() - 1];
1315 }
1316
1317 body = body.trim();
1318 if body.is_empty() {
1319 return None;
1320 }
1321
1322 let mut mantissa = String::with_capacity(body.len());
1323 let mut exponent = String::new();
1324 let mut in_exponent = false;
1325 let mut exponent_sign_allowed = false;
1326 let mut decimal_index: Option<usize> = None;
1327 let mut has_currency_symbol = false;
1328 let mut sign_consumed = had_parentheses;
1329
1330 for ch in body.chars() {
1331 match ch {
1332 '0'..='9' => {
1333 if in_exponent {
1334 exponent.push(ch);
1335 } else {
1336 mantissa.push(ch);
1337 }
1338 }
1339 '.' => {
1340 if in_exponent || decimal_index.is_some() {
1341 return None;
1342 }
1343 decimal_index = Some(mantissa.len());
1344 }
1345 'e' | 'E' => {
1346 if in_exponent {
1347 return None;
1348 }
1349 in_exponent = true;
1350 exponent_sign_allowed = true;
1351 continue;
1352 }
1353 '+' | '-' => {
1354 if in_exponent && exponent_sign_allowed {
1355 exponent.push(ch);
1356 exponent_sign_allowed = false;
1357 } else if !in_exponent && mantissa.is_empty() && !sign_consumed {
1358 sign_consumed = true;
1359 } else {
1360 return None;
1361 }
1362 }
1363 ',' | '_' | ' ' => {
1364 continue;
1365 }
1366 '$' | '€' | '£' | '¥' => {
1367 has_currency_symbol = true;
1368 continue;
1369 }
1370 _ => {
1371 return None;
1372 }
1373 }
1374 if ch != '+' && ch != '-' {
1375 exponent_sign_allowed = false;
1376 }
1377 }
1378
1379 if mantissa.is_empty() {
1380 return None;
1381 }
1382
1383 if decimal_index.is_none()
1384 && !in_exponent
1385 && mantissa.len() > 1
1386 && mantissa.chars().all(|c| c == '0')
1387 {
1388 return None;
1389 }
1390 if decimal_index.is_none() && !in_exponent && mantissa.len() > 1 && mantissa.starts_with('0') {
1391 return None;
1392 }
1393
1394 let mantissa_scale = decimal_index.map(|pos| mantissa.len() - pos).unwrap_or(0);
1395
1396 let exponent_value = if in_exponent {
1397 if exponent.is_empty() || exponent == "+" || exponent == "-" {
1398 return None;
1399 }
1400 match exponent.parse::<i32>() {
1401 Ok(value) => value,
1402 Err(_) => return None,
1403 }
1404 } else {
1405 0
1406 };
1407
1408 let mut digits = mantissa.clone();
1409 let mut scale_i32 = mantissa_scale as i32 - exponent_value;
1410 if scale_i32 < 0 {
1411 let zeros = (-scale_i32) as usize;
1412 digits.push_str(&"0".repeat(zeros));
1413 scale_i32 = 0;
1414 }
1415 let scale = scale_i32.max(0) as u32;
1416 let digits_len = digits.len() as u32;
1417 let integer_digits = digits_len.saturating_sub(scale);
1418
1419 let mut precision = if digits_len == 0 {
1420 0
1421 } else if integer_digits == 0 {
1422 scale.max(1)
1423 } else {
1424 integer_digits + scale
1425 };
1426 if precision == 0 {
1427 precision = 1;
1428 }
1429
1430 let fits_currency_scale = scale == 0 || crate::data::CURRENCY_ALLOWED_SCALES.contains(&scale);
1431 let overflow = precision > DECIMAL_MAX_PRECISION || scale > DECIMAL_MAX_PRECISION;
1432
1433 if in_exponent || decimal_index.is_some() || scale > 0 {
1434 return Some(NumericObservation::decimal(
1435 precision,
1436 scale,
1437 integer_digits,
1438 has_currency_symbol || had_parentheses,
1439 fits_currency_scale,
1440 overflow,
1441 ));
1442 }
1443
1444 if overflow {
1445 return Some(NumericObservation::float(
1446 has_currency_symbol || had_parentheses,
1447 ));
1448 }
1449
1450 Some(NumericObservation::integer(
1451 integer_digits,
1452 has_currency_symbol || had_parentheses,
1453 ))
1454}
1455
1456const CURRENCY_SYMBOL_PROMOTION_THRESHOLD: usize = 30;
1457const SUMMARY_TRACKED_LIMIT: usize = 5;
1458const CURRENT_SCHEMA_VERSION: &str = "1.1.0";
1459
1460#[derive(Clone, Default)]
1461struct SummaryAccumulator {
1462 non_empty: usize,
1463 tracked: Vec<(String, usize)>,
1464 other_values: usize,
1465}
1466
1467impl SummaryAccumulator {
1468 fn record(&mut self, value: &str) {
1469 self.non_empty += 1;
1470 if let Some((_, count)) = self
1471 .tracked
1472 .iter_mut()
1473 .find(|(existing, _)| existing == value)
1474 {
1475 *count += 1;
1476 return;
1477 }
1478 if self.tracked.len() < SUMMARY_TRACKED_LIMIT {
1479 self.tracked.push((value.to_string(), 1));
1480 } else {
1481 self.other_values += 1;
1482 }
1483 }
1484
1485 fn finalize(self) -> ColumnSummary {
1486 ColumnSummary {
1487 non_empty: self.non_empty,
1488 tracked_values: self.tracked,
1489 other_values: self.other_values,
1490 }
1491 }
1492}
1493
1494impl TypeCandidate {
1495 fn new() -> Self {
1496 Self {
1497 non_empty: 0,
1498 boolean_matches: 0,
1499 integer_matches: 0,
1500 integer_max_digits: 0,
1501 float_matches: 0,
1502 decimal_matches: 0,
1503 decimal_max_precision: 0,
1504 decimal_max_scale: 0,
1505 decimal_max_integer_digits: 0,
1506 decimal_precision_overflow: false,
1507 date_matches: 0,
1508 datetime_matches: 0,
1509 time_matches: 0,
1510 guid_matches: 0,
1511 currency_matches: 0,
1512 currency_symbol_hits: 0,
1513 unclassified: 0,
1514 }
1515 }
1516
1517 fn update(&mut self, value: &str) {
1518 let trimmed = value.trim();
1519 if trimmed.is_empty() {
1520 return;
1521 }
1522
1523 let lowered = trimmed.to_ascii_lowercase();
1524 if is_placeholder_token(&lowered) {
1525 return;
1526 }
1527
1528 self.non_empty += 1;
1529 let mut parsed_any = false;
1530
1531 if matches!(
1532 lowered.as_str(),
1533 "true" | "false" | "t" | "f" | "yes" | "no" | "y" | "n"
1534 ) {
1535 self.boolean_matches += 1;
1536 parsed_any = true;
1537 }
1538
1539 if let Some(observation) = analyze_numeric_token(trimmed) {
1540 parsed_any = true;
1541 match observation.kind {
1542 NumericKind::Integer => {
1543 self.integer_matches += 1;
1544 self.integer_max_digits =
1545 self.integer_max_digits.max(observation.integer_digits);
1546 if observation.fits_currency_scale {
1547 self.currency_matches += 1;
1548 }
1549 }
1550 NumericKind::Decimal => {
1551 self.decimal_matches += 1;
1552 self.decimal_max_precision =
1553 self.decimal_max_precision.max(observation.precision);
1554 self.decimal_max_scale = self.decimal_max_scale.max(observation.scale);
1555 self.decimal_max_integer_digits = self
1556 .decimal_max_integer_digits
1557 .max(observation.integer_digits);
1558 if observation.fits_currency_scale {
1559 self.currency_matches += 1;
1560 }
1561 if observation.overflow {
1562 self.decimal_precision_overflow = true;
1563 self.float_matches += 1;
1564 }
1565 }
1566 NumericKind::Float => {
1567 self.float_matches += 1;
1568 }
1569 }
1570 if observation.has_currency_symbol {
1571 self.currency_symbol_hits += 1;
1572 }
1573 }
1574
1575 if !parsed_any && parse_naive_date(trimmed).is_ok() {
1576 self.date_matches += 1;
1577 parsed_any = true;
1578 }
1579 if !parsed_any && parse_naive_datetime(trimmed).is_ok() {
1580 self.datetime_matches += 1;
1581 parsed_any = true;
1582 }
1583 if !parsed_any && parse_naive_time(trimmed).is_ok() {
1584 self.time_matches += 1;
1585 parsed_any = true;
1586 }
1587
1588 let trimmed_guid = trimmed.trim_matches(|c| matches!(c, '{' | '}'));
1589 if !parsed_any && Uuid::parse_str(trimmed_guid).is_ok() {
1590 self.guid_matches += 1;
1591 parsed_any = true;
1592 }
1593
1594 if !parsed_any {
1595 self.unclassified += 1;
1596 }
1597 }
1598
1599 fn majority(&self, count: usize) -> bool {
1600 count > 0 && count * 2 > self.non_empty
1601 }
1602
1603 fn decimal_spec(&self) -> Option<DecimalSpec> {
1604 if self.decimal_matches == 0 {
1605 return None;
1606 }
1607 if self.decimal_precision_overflow {
1608 return None;
1609 }
1610
1611 let scale = self.decimal_max_scale.min(DECIMAL_MAX_PRECISION);
1612 let integer_digits = self.decimal_max_integer_digits.max(self.integer_max_digits);
1613
1614 let mut precision = if integer_digits == 0 {
1615 scale.max(1)
1616 } else {
1617 integer_digits + scale
1618 };
1619 precision = precision.max(self.decimal_max_precision);
1620
1621 if precision > DECIMAL_MAX_PRECISION {
1622 return None;
1623 }
1624
1625 DecimalSpec::new(precision, scale).ok()
1626 }
1627
1628 fn decide(&self) -> ColumnType {
1629 if self.non_empty == 0 {
1630 return ColumnType::String;
1631 }
1632 if self.unclassified > 0 {
1633 return ColumnType::String;
1634 }
1635 let promote_currency = self.should_promote_currency();
1636 if self.majority(self.boolean_matches) {
1637 ColumnType::Boolean
1638 } else if promote_currency {
1639 ColumnType::Currency
1640 } else if let Some(spec) = self.decimal_spec() {
1641 ColumnType::Decimal(spec)
1642 } else if self.decimal_matches > 0 {
1643 ColumnType::Float
1644 } else if self.majority(self.integer_matches) {
1645 ColumnType::Integer
1646 } else if self.majority(self.currency_matches) && self.currency_symbol_hits > 0 {
1647 ColumnType::Currency
1648 } else if self.majority(self.float_matches) {
1649 ColumnType::Float
1650 } else if self.majority(self.date_matches) {
1651 ColumnType::Date
1652 } else if self.majority(self.datetime_matches) {
1653 ColumnType::DateTime
1654 } else if self.majority(self.time_matches) {
1655 ColumnType::Time
1656 } else if self.majority(self.guid_matches) {
1657 ColumnType::Guid
1658 } else {
1659 ColumnType::String
1660 }
1661 }
1662
1663 fn currency_symbol_ratio_meets_threshold(&self) -> bool {
1664 if self.non_empty == 0 {
1665 return false;
1666 }
1667 self.currency_symbol_hits.saturating_mul(100)
1668 >= self
1669 .non_empty
1670 .saturating_mul(CURRENCY_SYMBOL_PROMOTION_THRESHOLD)
1671 }
1672
1673 fn should_promote_currency(&self) -> bool {
1674 self.currency_matches > 0
1675 && self.currency_matches == self.non_empty
1676 && self.currency_symbol_ratio_meets_threshold()
1677 }
1678}
1679
1680fn is_placeholder_token(lowered: &str) -> bool {
1681 let stripped = lowered.trim_start_matches('#');
1682 matches!(
1683 stripped,
1684 "na" | "n/a" | "n.a." | "null" | "none" | "unknown" | "missing"
1685 ) || stripped.starts_with("invalid")
1686 || stripped.chars().all(|c| c == '-')
1687}
1688
1689fn placeholder_token_original(value: &str) -> Option<String> {
1690 let trimmed = value.trim();
1691 if trimmed.is_empty() {
1692 return None;
1693 }
1694 let lowered = trimmed.to_ascii_lowercase();
1695 if is_placeholder_token(&lowered) {
1696 Some(trimmed.to_string())
1697 } else {
1698 None
1699 }
1700}
1701
1702fn build_header_aliases(header: &str) -> HashSet<String> {
1703 let mut aliases = HashSet::new();
1704 let trimmed = header.trim();
1705 if trimmed.is_empty() {
1706 return aliases;
1707 }
1708
1709 let mut try_insert = |candidate: &str| {
1710 let token = candidate.trim();
1711 if token.is_empty() {
1712 return;
1713 }
1714 aliases.insert(token.to_ascii_lowercase());
1715 };
1716
1717 try_insert(trimmed);
1718
1719 for sep in ['_', ' ', '/'] {
1720 if let Some(part) = trimmed.rsplit(sep).next()
1721 && part != trimmed
1722 {
1723 try_insert(part);
1724 }
1725 }
1726
1727 let sanitized: String = trimmed
1728 .chars()
1729 .filter(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '+' | '-'))
1730 .collect();
1731 if !sanitized.is_empty() {
1732 try_insert(&sanitized);
1733 if sanitized.len() >= 2 {
1734 let chars: Vec<char> = sanitized.chars().collect();
1735 let first = chars.first().copied().unwrap();
1736 let last = chars.last().copied().unwrap_or(first);
1737 let shorthand = format!("{}{}", first, last);
1738 try_insert(&shorthand);
1739 }
1740 if sanitized.len() >= 3 {
1741 try_insert(&sanitized[..3]);
1742 }
1743 if sanitized.len() >= 4 {
1744 try_insert(&sanitized[..4]);
1745 }
1746 }
1747
1748 aliases
1749}
1750
1751fn row_values_look_like_header<'a, I>(row: I, header_aliases: &[HashSet<String>]) -> bool
1752where
1753 I: IntoIterator<Item = Option<Cow<'a, str>>>,
1754{
1755 let mut alias_hits = 0usize;
1756 let mut non_empty_fields = 0usize;
1757
1758 for (idx, value_opt) in row.into_iter().enumerate() {
1759 if idx >= header_aliases.len() {
1760 break;
1761 }
1762 let Some(value) = value_opt else {
1763 continue;
1764 };
1765 let trimmed = value.trim();
1766 if trimmed.is_empty() {
1767 continue;
1768 }
1769 non_empty_fields += 1;
1770 let lowered = trimmed.to_ascii_lowercase();
1771 if header_aliases[idx].contains(&lowered) {
1772 alias_hits += 1;
1773 }
1774 }
1775
1776 non_empty_fields >= HEADER_ALIAS_MIN_MATCHES
1777 && alias_hits >= HEADER_ALIAS_MIN_MATCHES
1778 && alias_hits.saturating_mul(100)
1779 >= non_empty_fields.saturating_mul(HEADER_ALIAS_THRESHOLD_PERCENT)
1780}
1781
1782fn option_row_looks_like_header(
1783 row: &[Option<String>],
1784 header_aliases: &[HashSet<String>],
1785) -> bool {
1786 row_values_look_like_header(
1787 row.iter().map(|value| value.as_deref().map(Cow::Borrowed)),
1788 header_aliases,
1789 )
1790}
1791
1792pub(crate) fn row_looks_like_header(row: &[String], header_aliases: &[HashSet<String>]) -> bool {
1793 row_values_look_like_header(
1794 row.iter().map(|value| Some(Cow::Borrowed(value.as_str()))),
1795 header_aliases,
1796 )
1797}
1798
1799fn generate_field_names(count: usize) -> Vec<String> {
1800 (0..count).map(|idx| format!("field_{idx}")).collect()
1801}
1802
1803fn token_is_common_header(value: &str) -> bool {
1804 if value.is_empty() {
1805 return false;
1806 }
1807 let normalized = value.trim().to_ascii_lowercase();
1808 if normalized.is_empty() {
1809 return false;
1810 }
1811 if COMMON_HEADER_TOKENS
1812 .iter()
1813 .any(|token| normalized == *token)
1814 {
1815 return true;
1816 }
1817 let sanitized = normalized
1818 .chars()
1819 .map(|ch| match ch {
1820 ' ' | '-' | '/' => '_',
1821 other => other,
1822 })
1823 .collect::<String>();
1824 COMMON_HEADER_TOKENS.iter().any(|token| sanitized == *token)
1825}
1826
1827fn value_is_data_like(value: &str) -> bool {
1828 let trimmed = value.trim();
1829 if trimmed.is_empty() {
1830 return false;
1831 }
1832 let lowered = trimmed.to_ascii_lowercase();
1833 if matches!(
1834 lowered.as_str(),
1835 "true" | "false" | "t" | "f" | "yes" | "no" | "y" | "n" | "1" | "0"
1836 ) {
1837 return true;
1838 }
1839 if parse_decimal_literal(trimmed).is_ok() {
1840 return true;
1841 }
1842 if parse_currency_decimal(trimmed).is_ok() {
1843 return true;
1844 }
1845 if trimmed.parse::<i64>().is_ok() {
1846 return true;
1847 }
1848 if trimmed.parse::<f64>().is_ok() {
1849 return true;
1850 }
1851 if parse_naive_datetime(trimmed).is_ok() {
1852 return true;
1853 }
1854 if parse_naive_date(trimmed).is_ok() {
1855 return true;
1856 }
1857 if parse_naive_time(trimmed).is_ok() {
1858 return true;
1859 }
1860 let trimmed_guid = trimmed.trim_matches(|c| matches!(c, '{' | '}'));
1861 Uuid::parse_str(trimmed_guid).is_ok()
1862}
1863
1864fn value_is_header_like(value: &str) -> bool {
1865 let trimmed = value.trim();
1866 if trimmed.is_empty() {
1867 return false;
1868 }
1869 if value_is_data_like(trimmed) {
1870 return false;
1871 }
1872 trimmed.chars().any(|c| c.is_ascii_alphabetic()) || token_is_common_header(trimmed)
1873}
1874
1875fn header_tokens_match_dictionary(row: &[String]) -> bool {
1876 row.iter()
1877 .filter(|value| token_is_common_header(value.trim()))
1878 .count()
1879 >= 2
1880}
1881
1882fn infer_has_header(first_row: &[String], other_rows: &[Vec<String>]) -> bool {
1883 let header_like_first = first_row
1884 .iter()
1885 .filter(|value| value_is_header_like(value))
1886 .count();
1887 let data_like_first = first_row
1888 .iter()
1889 .filter(|value| value_is_data_like(value))
1890 .count();
1891
1892 if header_like_first == 0 && data_like_first == 0 {
1893 return false;
1894 }
1895
1896 if data_like_first > header_like_first {
1897 return false;
1898 }
1899
1900 if other_rows.is_empty() {
1901 return header_like_first >= 2 || header_tokens_match_dictionary(first_row);
1902 }
1903
1904 let mut header_signal = 0usize;
1905 let mut data_signal = 0usize;
1906
1907 for column in 0..first_row.len() {
1908 let first_value = first_row.get(column).map(|s| s.as_str()).unwrap_or("");
1909 let first_is_header = value_is_header_like(first_value);
1910 let first_is_data = value_is_data_like(first_value);
1911
1912 let mut other_has_data = false;
1913 for row in other_rows {
1914 if let Some(value) = row.get(column)
1915 && value_is_data_like(value)
1916 {
1917 other_has_data = true;
1918 break;
1919 }
1920 }
1921
1922 if first_is_header && other_has_data {
1923 header_signal += 1;
1924 } else if first_is_data && other_has_data {
1925 data_signal += 1;
1926 }
1927 }
1928
1929 if header_signal > data_signal {
1930 return true;
1931 }
1932 if data_signal > header_signal {
1933 return false;
1934 }
1935
1936 if header_tokens_match_dictionary(first_row) && header_like_first >= 1 {
1937 return true;
1938 }
1939
1940 header_like_first > data_like_first
1941}
1942
1943pub fn detect_csv_layout(
1944 path: &Path,
1945 delimiter: u8,
1946 encoding: &'static Encoding,
1947 header_override: Option<bool>,
1948) -> Result<CsvLayout> {
1949 if io_utils::is_dash(path) {
1950 return Ok(CsvLayout {
1951 headers: Vec::new(),
1952 has_headers: header_override.unwrap_or(true),
1953 });
1954 }
1955
1956 if let Some(force_header) = header_override {
1957 let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, force_header)?;
1958 if force_header {
1959 let header_record = reader.byte_headers()?.clone();
1960 let headers = io_utils::decode_headers(&header_record, encoding)?;
1961 return Ok(CsvLayout {
1962 headers,
1963 has_headers: true,
1964 });
1965 } else {
1966 let mut record = csv::ByteRecord::new();
1967 let width = if reader.read_byte_record(&mut record)? {
1968 record.len()
1969 } else {
1970 0
1971 };
1972 let headers = generate_field_names(width);
1973 return Ok(CsvLayout {
1974 headers,
1975 has_headers: false,
1976 });
1977 }
1978 }
1979
1980 let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, false)?;
1981 let mut record = csv::ByteRecord::new();
1982 let mut decoded_rows = Vec::new();
1983
1984 while decoded_rows.len() < HEADER_DETECTION_SAMPLE_ROWS
1985 && reader.read_byte_record(&mut record)?
1986 {
1987 let decoded = io_utils::decode_record(&record, encoding)?;
1988 decoded_rows.push(decoded);
1989 }
1990
1991 if decoded_rows.is_empty() {
1992 return Ok(CsvLayout {
1993 headers: Vec::new(),
1994 has_headers: true,
1995 });
1996 }
1997
1998 let first_row = decoded_rows.first().cloned().unwrap_or_default();
1999 let has_header = infer_has_header(&first_row, &decoded_rows[1..]);
2000 let headers = if has_header {
2001 first_row
2002 } else {
2003 generate_field_names(first_row.len())
2004 };
2005
2006 Ok(CsvLayout {
2007 headers,
2008 has_headers: has_header,
2009 })
2010}
2011
2012pub fn infer_schema(
2013 path: &Path,
2014 sample_rows: usize,
2015 delimiter: u8,
2016 encoding: &'static Encoding,
2017 header_override: Option<bool>,
2018) -> Result<Schema> {
2019 let policy = PlaceholderPolicy::default();
2020 let (schema, _stats) = infer_schema_with_stats(
2021 path,
2022 sample_rows,
2023 delimiter,
2024 encoding,
2025 &policy,
2026 header_override,
2027 )?;
2028 Ok(schema)
2029}
2030
2031pub fn infer_schema_with_stats(
2032 path: &Path,
2033 sample_rows: usize,
2034 delimiter: u8,
2035 encoding: &'static Encoding,
2036 _placeholder_policy: &PlaceholderPolicy,
2037 header_override: Option<bool>,
2038) -> Result<(Schema, InferenceStats)> {
2039 let layout = detect_csv_layout(path, delimiter, encoding, header_override)?;
2040 let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, layout.has_headers)?;
2041 let headers = if layout.has_headers {
2042 let header_record = reader.byte_headers()?.clone();
2043 io_utils::decode_headers(&header_record, encoding)?
2044 } else {
2045 layout.headers.clone()
2046 };
2047 let mut candidates = vec![TypeCandidate::new(); headers.len()];
2048 let mut samples = vec![None; headers.len()];
2049 let mut summaries = vec![SummaryAccumulator::default(); headers.len()];
2050 let mut placeholders = vec![PlaceholderSummary::default(); headers.len()];
2051 let header_aliases: Vec<HashSet<String>> = headers
2052 .iter()
2053 .map(|header| build_header_aliases(header))
2054 .collect();
2055
2056 let mut record = csv::ByteRecord::new();
2057 let mut processed = 0usize;
2058 let mut decode_errors = 0usize;
2059 while reader.read_byte_record(&mut record)? {
2060 if sample_rows > 0 && processed >= sample_rows {
2061 break;
2062 }
2063 let mut decoded_row: Vec<Option<String>> = Vec::with_capacity(headers.len());
2064
2065 for field in record.iter().take(headers.len()) {
2066 if field.is_empty() {
2067 decoded_row.push(None);
2068 continue;
2069 }
2070 match io_utils::decode_bytes(field, encoding) {
2071 Ok(decoded) => {
2072 let trimmed = decoded.trim();
2073 if trimmed.is_empty() {
2074 decoded_row.push(None);
2075 continue;
2076 }
2077 let value = trimmed.to_string();
2078 decoded_row.push(Some(value));
2079 }
2080 Err(_) => {
2081 decode_errors += 1;
2082 decoded_row.push(None);
2083 }
2084 }
2085 }
2086
2087 while decoded_row.len() < headers.len() {
2088 decoded_row.push(None);
2089 }
2090
2091 let header_like = option_row_looks_like_header(&decoded_row, &header_aliases);
2092
2093 if header_like {
2094 continue;
2095 }
2096
2097 for (idx, value_opt) in decoded_row.into_iter().enumerate() {
2098 let Some(value) = value_opt else {
2099 continue;
2100 };
2101 if let Some(token) = placeholder_token_original(&value) {
2102 placeholders[idx].record(&token);
2103 continue;
2104 }
2105 candidates[idx].update(&value);
2106 summaries[idx].record(&value);
2107 if samples[idx].is_none() {
2108 samples[idx] = Some(value.clone());
2109 }
2110 }
2111 processed += 1;
2112 }
2113
2114 let columns = headers
2115 .iter()
2116 .enumerate()
2117 .map(|(idx, header)| ColumnMeta {
2118 name: header.clone(),
2119 datatype: candidates[idx].decide(),
2120 rename: None,
2121 value_replacements: Vec::new(),
2122 datatype_mappings: Vec::new(),
2123 })
2124 .collect();
2125
2126 let schema = Schema {
2127 columns,
2128 schema_version: None,
2129 has_headers: layout.has_headers,
2130 };
2131 let stats = InferenceStats {
2132 sample_values: samples,
2133 rows_read: processed,
2134 requested_rows: sample_rows,
2135 decode_errors,
2136 summaries: summaries
2137 .into_iter()
2138 .map(SummaryAccumulator::finalize)
2139 .collect(),
2140 placeholder_summaries: placeholders,
2141 };
2142
2143 Ok((schema, stats))
2144}
2145
2146pub(crate) fn format_hint_for(datatype: &ColumnType, sample: Option<&str>) -> Option<String> {
2147 let sample = sample?;
2148 match datatype {
2149 ColumnType::DateTime => {
2150 if sample.contains('T') {
2151 Some("ISO 8601 date-time".to_string())
2152 } else if sample.contains('/') {
2153 Some("Slash-separated date-time".to_string())
2154 } else if sample.contains('-') {
2155 Some("Hyphen-separated date-time".to_string())
2156 } else {
2157 Some("Date-time without delimiter hints".to_string())
2158 }
2159 }
2160 ColumnType::Date => {
2161 if sample.contains('/') {
2162 Some("Slash-separated date".to_string())
2163 } else if sample.contains('-') {
2164 Some("Hyphen-separated date".to_string())
2165 } else if sample.contains('.') {
2166 Some("Dot-separated date".to_string())
2167 } else {
2168 Some("Date without delimiter hints".to_string())
2169 }
2170 }
2171 ColumnType::Time => {
2172 if sample.contains('.') {
2173 Some("Time with fractional seconds".to_string())
2174 } else {
2175 Some("Colon-separated time".to_string())
2176 }
2177 }
2178 ColumnType::Boolean => {
2179 let normalized = sample.trim().to_ascii_lowercase();
2180 if matches!(normalized.as_str(), "true" | "false" | "t" | "f") {
2181 Some("Boolean (true/false tokens)".to_string())
2182 } else if matches!(normalized.as_str(), "yes" | "no" | "y" | "n") {
2183 Some("Boolean (yes/no tokens)".to_string())
2184 } else if matches!(normalized.as_str(), "1" | "0") {
2185 Some("Boolean (1/0 tokens)".to_string())
2186 } else {
2187 Some("Boolean (mixed tokens)".to_string())
2188 }
2189 }
2190 ColumnType::Float => {
2191 let has_currency = ["$", "€", "£", "¥"]
2192 .iter()
2193 .any(|symbol| sample.contains(symbol));
2194 if has_currency {
2195 Some("Currency symbol detected".to_string())
2196 } else if sample.contains(',') {
2197 Some("Thousands separator present".to_string())
2198 } else if sample.contains('.') {
2199 Some("Decimal point".to_string())
2200 } else {
2201 Some("Floating number without decimal point".to_string())
2202 }
2203 }
2204 ColumnType::Decimal(spec) => Some(format!(
2205 "Fixed decimal (precision {}, scale {})",
2206 spec.precision, spec.scale
2207 )),
2208 ColumnType::Currency => Some("Currency amount (2 or 4 decimal places)".to_string()),
2209 ColumnType::Integer => {
2210 if sample.starts_with('0') && sample.len() > 1 {
2211 Some("Leading zeros preserved".to_string())
2212 } else {
2213 Some("Whole number".to_string())
2214 }
2215 }
2216 ColumnType::Guid => {
2217 if sample.contains('{') || sample.contains('}') {
2218 Some("GUID with braces".to_string())
2219 } else if sample.contains('-') {
2220 Some("Canonical GUID".to_string())
2221 } else {
2222 Some("GUID without separators".to_string())
2223 }
2224 }
2225 ColumnType::String => None,
2226 }
2227}
2228
2229impl ColumnMeta {
2230 pub fn has_mappings(&self) -> bool {
2231 !self.datatype_mappings.is_empty()
2232 }
2233
2234 pub fn output_name(&self) -> &str {
2235 self.rename
2236 .as_deref()
2237 .filter(|value| !value.is_empty())
2238 .unwrap_or(&self.name)
2239 }
2240
2241 pub fn matches_header(&self, header: &str) -> bool {
2242 if header == self.name {
2243 return true;
2244 }
2245 if let Some(rename) = self.rename.as_deref()
2246 && !rename.is_empty()
2247 && header == rename
2248 {
2249 return true;
2250 }
2251 false
2252 }
2253
2254 pub fn apply_mappings_to_value(&self, value: &str) -> Result<Option<String>> {
2255 if value.is_empty() {
2256 return Ok(None);
2257 }
2258 if !self.has_mappings() {
2259 return Ok(Some(value.to_string()));
2260 }
2261
2262 let first_mapping = self
2263 .datatype_mappings
2264 .first()
2265 .expect("has_mappings() guarantees at least one mapping");
2266
2267 let mut current = parse_initial_value(value, first_mapping)?;
2268 for mapping in &self.datatype_mappings {
2269 let current_type = value_column_type(¤t);
2270 ensure!(
2271 current_type == mapping.from,
2272 "Datatype mapping chain expects '{:?}' but encountered '{:?}'",
2273 mapping.from,
2274 current_type
2275 );
2276 current = apply_single_mapping(mapping, current)?;
2277 }
2278
2279 let last_mapping = self
2280 .datatype_mappings
2281 .last()
2282 .expect("non-empty mapping chain");
2283 let rendered = render_mapped_value(¤t, last_mapping)?;
2284 if rendered.is_empty() {
2285 Ok(None)
2286 } else {
2287 Ok(Some(rendered))
2288 }
2289 }
2290
2291 pub fn normalize_value<'a>(&self, value: &'a str) -> Cow<'a, str> {
2292 for replacement in &self.value_replacements {
2293 if value == replacement.from {
2294 return Cow::Owned(replacement.to.clone());
2295 }
2296 }
2297 Cow::Borrowed(value)
2298 }
2299}
2300
2301impl Schema {
2302 pub fn has_transformations(&self) -> bool {
2303 self.columns.iter().any(|column| column.has_mappings())
2304 }
2305
2306 pub fn apply_transformations_to_row(&self, row: &mut [String]) -> Result<()> {
2307 for (idx, column) in self.columns.iter().enumerate() {
2308 if !column.has_mappings() {
2309 continue;
2310 }
2311 if let Some(cell) = row.get_mut(idx) {
2312 let original = cell.clone();
2313 match column
2314 .apply_mappings_to_value(&original)
2315 .with_context(|| format!("Column '{}'", column.name))?
2316 {
2317 Some(mapped) => *cell = mapped,
2318 None => cell.clear(),
2319 }
2320 }
2321 }
2322 Ok(())
2323 }
2324
2325 pub fn apply_replacements_to_row(&self, row: &mut [String]) {
2326 for (idx, column) in self.columns.iter().enumerate() {
2327 if let Some(value) = row.get_mut(idx)
2328 && let Cow::Owned(normalized) = column.normalize_value(value)
2329 {
2330 *value = normalized;
2331 }
2332 }
2333 }
2334
2335 pub fn validate_datatype_mappings(&self) -> Result<()> {
2336 self.validate_decimal_specs()?;
2337 for column in &self.columns {
2338 if column.datatype_mappings.is_empty() {
2339 continue;
2340 }
2341 let mut previous_to = None;
2342 for (step_index, mapping) in column.datatype_mappings.iter().enumerate() {
2343 if let Some(expected) = previous_to.as_ref() {
2344 ensure!(
2345 mapping.from == *expected,
2346 "Column '{}' mapping step {} expects input '{:?}' but prior step outputs '{:?}'",
2347 column.name,
2348 step_index + 1,
2349 mapping.from,
2350 expected
2351 );
2352 }
2353 validate_mapping_options(&column.name, mapping)?;
2354 previous_to = Some(mapping.to.clone());
2355 }
2356 let terminal = previous_to.expect("mapping chain must have terminal type");
2357 ensure!(
2358 terminal == column.datatype,
2359 "Column '{}' mappings terminate at '{:?}' but column datatype is '{:?}'",
2360 column.name,
2361 terminal,
2362 column.datatype
2363 );
2364 }
2365 Ok(())
2366 }
2367
2368 fn validate_decimal_specs(&self) -> Result<()> {
2369 for column in &self.columns {
2370 if let ColumnType::Decimal(spec) = &column.datatype {
2371 spec.ensure_valid()?;
2372 }
2373 for mapping in &column.datatype_mappings {
2374 if let ColumnType::Decimal(spec) = &mapping.from {
2375 spec.ensure_valid()?;
2376 }
2377 if let ColumnType::Decimal(spec) = &mapping.to {
2378 spec.ensure_valid()?;
2379 }
2380 }
2381 }
2382 Ok(())
2383 }
2384}
2385
2386pub fn apply_placeholder_replacements(
2387 schema: &mut Schema,
2388 stats: &InferenceStats,
2389 policy: &PlaceholderPolicy,
2390) -> usize {
2391 let replacement_value = match policy {
2392 PlaceholderPolicy::TreatAsEmpty => String::new(),
2393 PlaceholderPolicy::FillWith(value) => value.clone(),
2394 };
2395 let mut added = 0usize;
2396 for (idx, column) in schema.columns.iter_mut().enumerate() {
2397 let Some(summary) = stats.placeholder_summary(idx) else {
2398 continue;
2399 };
2400 let entries = summary.entries();
2401 if entries.is_empty() {
2402 continue;
2403 }
2404 for (token, _) in entries {
2405 if column
2406 .value_replacements
2407 .iter()
2408 .any(|existing| existing.from == token)
2409 {
2410 continue;
2411 }
2412 column.value_replacements.push(ValueReplacement {
2413 from: token,
2414 to: replacement_value.clone(),
2415 });
2416 added += 1;
2417 }
2418 }
2419 added
2420}
2421
2422#[cfg(test)]
2423mod tests {
2424 use super::*;
2425 use encoding_rs::UTF_8;
2426 use proptest::prelude::*;
2427 use std::io::Write;
2428 use std::str::FromStr;
2429 use tempfile::NamedTempFile;
2430
2431 #[test]
2432 fn infer_schema_with_stats_captures_samples() {
2433 let mut file = NamedTempFile::new().expect("temp file");
2434 writeln!(file, "id,date,value").unwrap();
2435 writeln!(file, "1,2024-01-01T08:30:00Z,$12.34").unwrap();
2436 writeln!(file, "2,2024-01-02T09:45:00Z,$56.78").unwrap();
2437
2438 let policy = PlaceholderPolicy::default();
2439 let (schema, stats) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2440 .expect("infer with stats");
2441
2442 assert_eq!(schema.columns.len(), 3);
2443 assert_eq!(stats.sample_value(1), Some("2024-01-01T08:30:00Z"));
2444 assert_eq!(stats.sample_value(2), Some("$12.34"));
2445 assert_eq!(stats.rows_read(), 2);
2446 assert_eq!(stats.decode_errors(), 0);
2447 }
2448
2449 #[test]
2450 fn format_hint_detects_common_patterns() {
2451 let date_hint = format_hint_for(&ColumnType::Date, Some("2024/01/30"));
2452 assert_eq!(date_hint.as_deref(), Some("Slash-separated date"));
2453
2454 let currency_hint = format_hint_for(&ColumnType::Float, Some("€1,234.50"));
2455 assert_eq!(currency_hint.as_deref(), Some("Currency symbol detected"));
2456
2457 let guid_hint = format_hint_for(
2458 &ColumnType::Guid,
2459 Some("{ABCDEF12-3456-7890-ABCD-EF1234567890}"),
2460 );
2461 assert_eq!(guid_hint.as_deref(), Some("GUID with braces"));
2462 }
2463
2464 #[test]
2465 fn datatype_mappings_convert_string_to_date() {
2466 let mappings = vec![
2467 DatatypeMapping {
2468 from: ColumnType::String,
2469 to: ColumnType::DateTime,
2470 strategy: None,
2471 options: BTreeMap::new(),
2472 },
2473 DatatypeMapping {
2474 from: ColumnType::DateTime,
2475 to: ColumnType::Date,
2476 strategy: None,
2477 options: BTreeMap::new(),
2478 },
2479 ];
2480
2481 let column = ColumnMeta {
2482 name: "event_date".to_string(),
2483 datatype: ColumnType::Date,
2484 rename: None,
2485 value_replacements: Vec::new(),
2486 datatype_mappings: mappings,
2487 };
2488 let schema = Schema {
2489 columns: vec![column],
2490 schema_version: None,
2491 has_headers: true,
2492 };
2493
2494 let mut row = vec!["2024-05-10T13:45:00".to_string()];
2495 schema
2496 .apply_transformations_to_row(&mut row)
2497 .expect("apply datatype mappings");
2498 assert_eq!(row[0], "2024-05-10");
2499 }
2500
2501 #[test]
2502 fn datatype_mappings_round_float_values() {
2503 let mut options = BTreeMap::new();
2504 options.insert("scale".to_string(), Value::from(4));
2505 let mapping = DatatypeMapping {
2506 from: ColumnType::String,
2507 to: ColumnType::Float,
2508 strategy: Some("round".to_string()),
2509 options,
2510 };
2511 let column = ColumnMeta {
2512 name: "measurement".to_string(),
2513 datatype: ColumnType::Float,
2514 rename: None,
2515 value_replacements: Vec::new(),
2516 datatype_mappings: vec![mapping],
2517 };
2518 let schema = Schema {
2519 columns: vec![column],
2520 schema_version: None,
2521 has_headers: true,
2522 };
2523 let mut row = vec!["3.1415926535".to_string()];
2524 schema
2525 .apply_transformations_to_row(&mut row)
2526 .expect("round float");
2527 assert_eq!(row[0], "3.1416");
2528 }
2529
2530 #[test]
2531 fn datatype_mappings_round_currency_values() {
2532 let mut options = BTreeMap::new();
2533 options.insert("scale".to_string(), Value::from(2));
2534 let mapping = DatatypeMapping {
2535 from: ColumnType::String,
2536 to: ColumnType::Currency,
2537 strategy: Some("round".to_string()),
2538 options,
2539 };
2540 let column = ColumnMeta {
2541 name: "price".to_string(),
2542 datatype: ColumnType::Currency,
2543 rename: None,
2544 value_replacements: Vec::new(),
2545 datatype_mappings: vec![mapping],
2546 };
2547 let schema = Schema {
2548 columns: vec![column],
2549 schema_version: None,
2550 has_headers: true,
2551 };
2552 let mut row = vec!["12.345".to_string()];
2553 schema
2554 .apply_transformations_to_row(&mut row)
2555 .expect("round currency");
2556 assert_eq!(row[0], "12.35");
2557 }
2558
2559 #[test]
2560 fn datatype_mappings_preserve_currency_scale_when_unspecified() {
2561 let mapping = DatatypeMapping {
2562 from: ColumnType::String,
2563 to: ColumnType::Currency,
2564 strategy: None,
2565 options: BTreeMap::new(),
2566 };
2567 let column = ColumnMeta {
2568 name: "premium".to_string(),
2569 datatype: ColumnType::Currency,
2570 rename: None,
2571 value_replacements: Vec::new(),
2572 datatype_mappings: vec![mapping],
2573 };
2574 let schema = Schema {
2575 columns: vec![column],
2576 schema_version: None,
2577 has_headers: true,
2578 };
2579 let mut row = vec!["123.4567".to_string()];
2580 schema
2581 .apply_transformations_to_row(&mut row)
2582 .expect("preserve currency scale");
2583 assert_eq!(row[0], "123.4567");
2584 }
2585
2586 #[test]
2587 fn datatype_mappings_convert_currency_to_decimal() {
2588 let spec = DecimalSpec::new(10, 2).expect("decimal spec");
2589 let currency_mapping = DatatypeMapping {
2590 from: ColumnType::String,
2591 to: ColumnType::Currency,
2592 strategy: None,
2593 options: BTreeMap::new(),
2594 };
2595 let decimal_mapping = DatatypeMapping {
2596 from: ColumnType::Currency,
2597 to: ColumnType::Decimal(spec.clone()),
2598 strategy: Some("truncate".to_string()),
2599 options: BTreeMap::new(),
2600 };
2601 let column = ColumnMeta {
2602 name: "amount".to_string(),
2603 datatype: ColumnType::Decimal(spec.clone()),
2604 rename: None,
2605 value_replacements: Vec::new(),
2606 datatype_mappings: vec![currency_mapping, decimal_mapping],
2607 };
2608 let schema = Schema {
2609 columns: vec![column],
2610 schema_version: None,
2611 has_headers: true,
2612 };
2613 let mut row = vec!["$123.4567".to_string()];
2614 schema
2615 .apply_transformations_to_row(&mut row)
2616 .expect("currency to decimal mapping");
2617 assert_eq!(row[0], "123.45");
2618 }
2619
2620 #[test]
2621 fn infer_schema_identifies_currency_columns() {
2622 let mut file = NamedTempFile::new().expect("temp file");
2623 writeln!(file, "amount,name").unwrap();
2624 writeln!(file, "$12.34,alpha").unwrap();
2625 writeln!(file, "56.7800,beta").unwrap();
2626
2627 let policy = PlaceholderPolicy::default();
2628 let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2629 .expect("infer schema");
2630 assert_eq!(schema.columns.len(), 2);
2631 assert_eq!(schema.columns[0].datatype, ColumnType::Currency);
2632 assert_eq!(schema.columns[1].datatype, ColumnType::String);
2633 }
2634
2635 #[test]
2636 fn infer_schema_promotes_currency_when_symbol_ratio_met() {
2637 let mut file = NamedTempFile::new().expect("temp file");
2638 writeln!(file, "amount").unwrap();
2639 writeln!(file, "$12.00").unwrap();
2640 writeln!(file, "14").unwrap();
2641 writeln!(file, "15").unwrap();
2642
2643 let policy = PlaceholderPolicy::default();
2644 let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2645 .expect("infer schema");
2646 assert_eq!(schema.columns.len(), 1);
2647 assert_eq!(schema.columns[0].datatype, ColumnType::Currency);
2648 }
2649
2650 #[test]
2651 fn infer_schema_prefers_decimal_when_fraction_present() {
2652 let mut file = NamedTempFile::new().expect("temp file");
2653 writeln!(file, "amount").unwrap();
2654 writeln!(file, "1").unwrap();
2655 writeln!(file, "2").unwrap();
2656 writeln!(file, "3.5").unwrap();
2657
2658 let policy = PlaceholderPolicy::default();
2659 let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2660 .expect("infer schema");
2661
2662 let expected = DecimalSpec::new(2, 1).expect("valid spec");
2663 match &schema.columns[0].datatype {
2664 ColumnType::Decimal(spec) => assert_eq!(spec, &expected),
2665 other => panic!("expected decimal column, got {other:?}"),
2666 }
2667 }
2668
2669 #[test]
2670 fn infer_schema_supports_scientific_notation_as_decimal() {
2671 let mut file = NamedTempFile::new().expect("temp file");
2672 writeln!(file, "value").unwrap();
2673 writeln!(file, "1e3").unwrap();
2674 writeln!(file, "2.5e-1").unwrap();
2675
2676 let policy = PlaceholderPolicy::default();
2677 let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2678 .expect("infer schema");
2679
2680 let expected = DecimalSpec::new(6, 2).expect("valid spec");
2681 match &schema.columns[0].datatype {
2682 ColumnType::Decimal(spec) => assert_eq!(spec, &expected),
2683 other => panic!("expected decimal column, got {other:?}"),
2684 }
2685 }
2686
2687 #[test]
2688 fn infer_schema_treats_leading_zero_integers_as_string() {
2689 let mut file = NamedTempFile::new().expect("temp file");
2690 writeln!(file, "code").unwrap();
2691 writeln!(file, "001").unwrap();
2692 writeln!(file, "002").unwrap();
2693 writeln!(file, "003").unwrap();
2694
2695 let policy = PlaceholderPolicy::default();
2696 let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2697 .expect("infer schema");
2698
2699 assert_eq!(schema.columns[0].datatype, ColumnType::String);
2700 }
2701
2702 #[test]
2703 fn infer_schema_prioritizes_decimal_over_currency_without_symbols() {
2704 let mut file = NamedTempFile::new().expect("temp file");
2705 writeln!(file, "amount").unwrap();
2706 writeln!(file, "12.34").unwrap();
2707 writeln!(file, "45.67").unwrap();
2708
2709 let policy = PlaceholderPolicy::default();
2710 let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2711 .expect("infer schema");
2712
2713 let expected = DecimalSpec::new(4, 2).expect("valid spec");
2714 match &schema.columns[0].datatype {
2715 ColumnType::Decimal(spec) => assert_eq!(spec, &expected),
2716 other => panic!("expected decimal column, got {other:?}"),
2717 }
2718 }
2719
2720 #[test]
2721 fn analyze_numeric_token_handles_scientific_notation() {
2722 let observation =
2723 super::analyze_numeric_token("1e3").expect("scientific notation should be recognized");
2724 assert!(matches!(observation.kind, NumericKind::Decimal));
2725 }
2726
2727 #[test]
2728 fn analyze_numeric_token_handles_scientific_with_fraction() {
2729 let observation = super::analyze_numeric_token("2.5e-1")
2730 .expect("scientific notation with fraction should be recognized");
2731 assert!(matches!(observation.kind, NumericKind::Decimal));
2732 assert_eq!(observation.scale, 2);
2733 assert_eq!(observation.precision, 2);
2734 }
2735
2736 #[test]
2737 fn infer_schema_prefers_majority_integer() {
2738 let mut file = NamedTempFile::new().expect("temp file");
2739 writeln!(file, "id,name").unwrap();
2740 writeln!(file, "1,alpha").unwrap();
2741 writeln!(file, "2,beta").unwrap();
2742 writeln!(file, "unknown,gamma").unwrap();
2743
2744 let policy = PlaceholderPolicy::default();
2745 let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2746 .expect("infer schema");
2747 assert_eq!(schema.columns[0].datatype, ColumnType::Integer);
2748 assert_eq!(schema.columns[1].datatype, ColumnType::String);
2749 }
2750
2751 #[test]
2752 fn infer_schema_prefers_majority_boolean() {
2753 let mut file = NamedTempFile::new().expect("temp file");
2754 writeln!(file, "flag").unwrap();
2755 writeln!(file, "true").unwrap();
2756 writeln!(file, "false").unwrap();
2757 writeln!(file, "unknown").unwrap();
2758
2759 let policy = PlaceholderPolicy::default();
2760 let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2761 .expect("infer schema");
2762 assert_eq!(schema.columns.len(), 1);
2763 assert_eq!(schema.columns[0].datatype, ColumnType::Boolean);
2764 }
2765
2766 #[test]
2767 fn infer_schema_collects_na_placeholders() {
2768 let mut file = NamedTempFile::new().expect("temp file");
2769 writeln!(file, "value").unwrap();
2770 writeln!(file, "NA").unwrap();
2771 writeln!(file, "#N/A").unwrap();
2772 writeln!(file, "42").unwrap();
2773
2774 let policy = PlaceholderPolicy::default();
2775 let (_, stats) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2776 .expect("infer stats");
2777
2778 let summary = stats.placeholder_summary(0).expect("placeholder summary");
2779 let entries = summary.entries();
2780 assert_eq!(entries.len(), 2);
2781 assert!(
2782 entries
2783 .iter()
2784 .any(|(token, count)| token == "NA" && *count == 1)
2785 );
2786 assert!(
2787 entries
2788 .iter()
2789 .any(|(token, count)| token == "#N/A" && *count == 1)
2790 );
2791 }
2792
2793 #[test]
2794 fn assume_header_false_forces_field_names() {
2795 let mut file = NamedTempFile::new().expect("temp file");
2796 writeln!(file, "id,value").unwrap();
2797 writeln!(file, "1,2").unwrap();
2798 writeln!(file, "3,4").unwrap();
2799
2800 let policy = PlaceholderPolicy::default();
2801 let (schema, _) =
2802 infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, Some(false))
2803 .expect("force headerless schema");
2804
2805 assert!(!schema.has_headers);
2806 let column_names: Vec<_> = schema.columns.iter().map(|c| c.name.as_str()).collect();
2807 assert_eq!(column_names, vec!["field_0", "field_1"]);
2808 }
2809
2810 #[test]
2811 fn assume_header_true_preserves_first_row_names() {
2812 let mut file = NamedTempFile::new().expect("temp file");
2813 writeln!(file, "100,200").unwrap();
2814 writeln!(file, "1,2").unwrap();
2815 writeln!(file, "3,4").unwrap();
2816
2817 let policy = PlaceholderPolicy::default();
2818 let (schema, stats) =
2819 infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, Some(true))
2820 .expect("assume header true");
2821
2822 assert!(schema.has_headers);
2823 let column_names: Vec<_> = schema.columns.iter().map(|c| c.name.as_str()).collect();
2824 assert_eq!(column_names, vec!["100", "200"]);
2825 assert_eq!(stats.sample_value(0), Some("1"));
2827 }
2828
2829 #[test]
2830 fn apply_placeholder_replacements_respects_policy() {
2831 let mut file = NamedTempFile::new().expect("temp file");
2832 writeln!(file, "value").unwrap();
2833 writeln!(file, "NA").unwrap();
2834 writeln!(file, "#NA").unwrap();
2835 writeln!(file, "7").unwrap();
2836
2837 let policy = PlaceholderPolicy::default();
2838 let (schema, stats) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2839 .expect("infer schema");
2840
2841 let mut schema_empty = schema.clone();
2842 let added_empty = apply_placeholder_replacements(&mut schema_empty, &stats, &policy);
2843 assert_eq!(added_empty, 2);
2844 assert!(
2845 schema_empty.columns[0]
2846 .value_replacements
2847 .iter()
2848 .any(|r| r.from == "NA" && r.to.is_empty())
2849 );
2850 assert!(
2851 schema_empty.columns[0]
2852 .value_replacements
2853 .iter()
2854 .any(|r| r.from == "#NA" && r.to.is_empty())
2855 );
2856
2857 let mut schema_fill = schema.clone();
2858 let fill_policy = PlaceholderPolicy::FillWith("NULL".to_string());
2859 let added_fill = apply_placeholder_replacements(&mut schema_fill, &stats, &fill_policy);
2860 assert_eq!(added_fill, 2);
2861 assert!(
2862 schema_fill.columns[0]
2863 .value_replacements
2864 .iter()
2865 .all(|r| r.to == "NULL")
2866 );
2867
2868 let added_duplicate =
2869 apply_placeholder_replacements(&mut schema_fill, &stats, &fill_policy);
2870 assert_eq!(added_duplicate, 0);
2871 }
2872
2873 #[test]
2874 fn parse_decimal_type_supports_positional_syntax() {
2875 let parsed = ColumnType::from_str("decimal(18,4)").expect("parse decimal positional");
2876 match parsed {
2877 ColumnType::Decimal(spec) => {
2878 assert_eq!(spec.precision, 18);
2879 assert_eq!(spec.scale, 4);
2880 }
2881 other => panic!("expected decimal column, got {other:?}"),
2882 }
2883 }
2884
2885 #[test]
2886 fn parse_decimal_type_supports_named_syntax() {
2887 let parsed =
2888 ColumnType::from_str("decimal(precision=20, scale=6)").expect("parse decimal named");
2889 let spec = parsed
2890 .decimal_spec()
2891 .expect("decimal spec present after parsing");
2892 assert_eq!(spec.precision, 20);
2893 assert_eq!(spec.scale, 6);
2894 }
2895
2896 #[test]
2897 fn parse_decimal_type_rejects_missing_scale() {
2898 let err = ColumnType::from_str("decimal(10)").expect_err("missing scale error");
2899 assert!(
2900 err.to_string()
2901 .contains("Decimal type requires a scale value")
2902 );
2903 }
2904
2905 #[test]
2906 fn schema_parsing_rejects_unsupported_structured_datatype() {
2907 let yaml = r#"
2908columns:
2909 - name: location
2910 datatype:
2911 geography: {}
2912"#;
2913 let err = serde_yaml::from_str::<Schema>(yaml)
2914 .expect_err("unsupported structured datatype should fail");
2915 assert!(
2916 err.to_string()
2917 .contains("Unsupported structured datatype 'geography'")
2918 );
2919 }
2920
2921 #[test]
2922 fn schema_parsing_rejects_decimal_precision_overflow() {
2923 let yaml = r#"
2924columns:
2925 - name: amount
2926 datatype: decimal(29,2)
2927"#;
2928 let err = serde_yaml::from_str::<Schema>(yaml).expect_err("precision overflow should fail");
2929 assert!(err.to_string().contains("Decimal precision must be <="));
2930 }
2931
2932 #[test]
2933 fn decimal_cli_token_formats_precision_and_scale() {
2934 let parsed = ColumnType::from_str("decimal(28,9)").expect("parse decimal for cli token");
2935 assert_eq!(parsed.cli_token(), "decimal(28,9)");
2936 assert_eq!(parsed.signature_token(), "decimal(28,9)");
2937 assert_eq!(parsed.describe(), "decimal(precision=28,scale=9)");
2938 }
2939
2940 #[test]
2941 fn datatype_mappings_convert_string_to_decimal_with_rounding() {
2942 let spec = DecimalSpec::new(12, 2).expect("valid decimal spec");
2943 let mapping = DatatypeMapping {
2944 from: ColumnType::String,
2945 to: ColumnType::Decimal(spec.clone()),
2946 strategy: Some("round".to_string()),
2947 options: BTreeMap::new(),
2948 };
2949 let column = ColumnMeta {
2950 name: "amount".to_string(),
2951 datatype: ColumnType::Decimal(spec.clone()),
2952 rename: None,
2953 value_replacements: Vec::new(),
2954 datatype_mappings: vec![mapping],
2955 };
2956 let schema = Schema {
2957 columns: vec![column],
2958 schema_version: None,
2959 has_headers: true,
2960 };
2961 let mut row = vec!["123.455".to_string()];
2962 schema
2963 .apply_transformations_to_row(&mut row)
2964 .expect("apply decimal rounding mapping");
2965 assert_eq!(row[0], "123.46");
2966 }
2967
2968 #[test]
2969 fn datatype_mappings_convert_string_to_decimal_with_truncation() {
2970 let spec = DecimalSpec::new(14, 3).expect("valid decimal spec");
2971 let mapping = DatatypeMapping {
2972 from: ColumnType::String,
2973 to: ColumnType::Decimal(spec.clone()),
2974 strategy: Some("truncate".to_string()),
2975 options: BTreeMap::new(),
2976 };
2977 let column = ColumnMeta {
2978 name: "measurement".to_string(),
2979 datatype: ColumnType::Decimal(spec.clone()),
2980 rename: None,
2981 value_replacements: Vec::new(),
2982 datatype_mappings: vec![mapping],
2983 };
2984 let schema = Schema {
2985 columns: vec![column],
2986 schema_version: None,
2987 has_headers: true,
2988 };
2989 let mut row = vec!["-87.6549".to_string()];
2990 schema
2991 .apply_transformations_to_row(&mut row)
2992 .expect("apply decimal truncation mapping");
2993 assert_eq!(row[0], "-87.654");
2994 }
2995
2996 fn apply_grouping(value: &str, separator: char) -> String {
2997 let chars: Vec<char> = value.chars().collect();
2998 if chars.len() <= 3 {
2999 return value.to_string();
3000 }
3001 let mut grouped = String::new();
3002 let mut index = chars.len() % 3;
3003 if index == 0 {
3004 index = 3;
3005 }
3006 grouped.extend(&chars[..index]);
3007 while index < chars.len() {
3008 grouped.push(separator);
3009 grouped.extend(&chars[index..index + 3]);
3010 index += 3;
3011 }
3012 grouped
3013 }
3014
3015 fn digit_strategy() -> impl Strategy<Value = char> {
3016 (0u8..=9).prop_map(|d| (b'0' + d) as char)
3017 }
3018
3019 fn numeric_token_strategy() -> impl Strategy<Value = (String, u32, bool, bool)> {
3020 (
3021 1u64..=999_999,
3022 0u32..=4,
3023 any::<bool>(),
3024 any::<bool>(),
3025 any::<bool>(),
3026 prop_oneof![Just('$'), Just('€'), Just('£'), Just('¥')],
3027 proptest::option::of(prop_oneof![Just(','), Just('_'), Just(' ')]),
3028 any::<bool>(),
3029 )
3030 .prop_flat_map(
3031 |(
3032 integer,
3033 scale,
3034 negative,
3035 parentheses,
3036 use_symbol,
3037 symbol_char,
3038 separator,
3039 spaced,
3040 )| {
3041 let fraction_strategy = if scale == 0 {
3042 Just(String::new()).boxed()
3043 } else {
3044 proptest::collection::vec(digit_strategy(), scale as usize)
3045 .prop_map(|digits| digits.into_iter().collect())
3046 .boxed()
3047 };
3048 fraction_strategy.prop_map(move |fraction| {
3049 let mut body = integer.to_string();
3050 if let Some(sep) = separator {
3051 body = apply_grouping(&body, sep);
3052 }
3053 if scale > 0 {
3054 body.push('.');
3055 body.push_str(&fraction);
3056 }
3057 let mut has_symbol = false;
3058 if use_symbol {
3059 has_symbol = true;
3060 body = format!("{}{}", symbol_char, body);
3061 }
3062 let mut formatted = body;
3063 let negative = negative && integer != 0;
3064 let parentheses_active = parentheses && negative;
3065 if negative {
3066 if parentheses_active {
3067 formatted = format!("({formatted})");
3068 } else {
3069 formatted = format!("-{formatted}");
3070 }
3071 }
3072 if spaced {
3073 formatted = format!(" {formatted} ");
3074 }
3075 (formatted, scale, has_symbol, parentheses_active)
3076 })
3077 },
3078 )
3079 }
3080
3081 proptest! {
3082 #[test]
3083 fn analyze_numeric_token_handles_generated_numeric_formats(
3084 (token, scale, has_symbol, parentheses_active) in numeric_token_strategy()
3085 ) {
3086 let observation = super::analyze_numeric_token(&token)
3087 .expect("generated numeric token should classify");
3088 if scale > 0 {
3089 prop_assert_eq!(observation.kind, NumericKind::Decimal);
3090 prop_assert_eq!(observation.scale, scale);
3091 } else {
3092 prop_assert_eq!(observation.kind, NumericKind::Integer);
3093 }
3094 prop_assert_eq!(
3095 observation.has_currency_symbol,
3096 has_symbol || parentheses_active
3097 );
3098 }
3099 }
3100
3101 #[test]
3102 fn datatype_mappings_reject_unknown_currency_strategy() {
3103 let mut options = BTreeMap::new();
3104 options.insert("scale".to_string(), Value::from(2));
3105 let mapping = DatatypeMapping {
3106 from: ColumnType::String,
3107 to: ColumnType::Currency,
3108 strategy: Some("ceil".to_string()),
3109 options,
3110 };
3111 let column = ColumnMeta {
3112 name: "price".to_string(),
3113 datatype: ColumnType::Currency,
3114 rename: None,
3115 value_replacements: Vec::new(),
3116 datatype_mappings: vec![mapping],
3117 };
3118 let schema = Schema {
3119 columns: vec![column],
3120 schema_version: None,
3121 has_headers: true,
3122 };
3123 let mut row = vec!["12.34".to_string()];
3124 let err = schema
3125 .apply_transformations_to_row(&mut row)
3126 .expect_err("invalid currency strategy should fail");
3127 assert!(err.to_string().contains("Column 'price'"));
3128 assert!(err.chain().any(|source| {
3129 source
3130 .to_string()
3131 .contains("Unsupported currency rounding strategy")
3132 }));
3133 }
3134
3135 #[test]
3136 fn datatype_mappings_reject_decimal_precision_overflow() {
3137 let spec = DecimalSpec::new(8, 2).expect("decimal spec");
3138 let mapping = DatatypeMapping {
3139 from: ColumnType::String,
3140 to: ColumnType::Decimal(spec.clone()),
3141 strategy: None,
3142 options: BTreeMap::new(),
3143 };
3144 let column = ColumnMeta {
3145 name: "amount".to_string(),
3146 datatype: ColumnType::Decimal(spec.clone()),
3147 rename: None,
3148 value_replacements: Vec::new(),
3149 datatype_mappings: vec![mapping],
3150 };
3151 let schema = Schema {
3152 columns: vec![column],
3153 schema_version: None,
3154 has_headers: true,
3155 };
3156 let mut row = vec!["1234567.89".to_string()];
3157 let err = schema
3158 .apply_transformations_to_row(&mut row)
3159 .expect_err("precision overflow should fail");
3160 assert!(err.to_string().contains("Column 'amount'"));
3161 assert!(
3162 err.chain()
3163 .any(|source| source.to_string().contains("must not exceed"))
3164 );
3165 }
3166}