csv_managed/
schema.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, HashSet},
4    fmt,
5    fs::File,
6    io::BufReader,
7    path::Path,
8    str::FromStr,
9};
10
11use anyhow::{Context, Result, anyhow, bail, ensure};
12use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
13use encoding_rs::Encoding;
14use rust_decimal::Decimal;
15use rust_decimal::prelude::FromPrimitive;
16use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
17use serde_yaml::Value;
18use uuid::Uuid;
19
20use crate::{
21    data::{
22        CurrencyValue, FixedDecimalValue, Value as DataValue, parse_currency_decimal,
23        parse_decimal_literal, parse_naive_date, parse_naive_datetime, parse_naive_time,
24        parse_typed_value,
25    },
26    io_utils,
27};
28
29const DECIMAL_MAX_PRECISION: u32 = 28;
30const HEADER_ALIAS_THRESHOLD_PERCENT: usize = 80;
31const HEADER_ALIAS_MIN_MATCHES: usize = 4;
32const HEADER_DETECTION_SAMPLE_ROWS: usize = 6;
33
34const COMMON_HEADER_TOKENS: &[&str] = &[
35    "address",
36    "amount",
37    "category",
38    "city",
39    "code",
40    "country",
41    "created",
42    "currency",
43    "date",
44    "description",
45    "email",
46    "first_name",
47    "id",
48    "item",
49    "last_name",
50    "name",
51    "phone",
52    "price",
53    "quantity",
54    "state",
55    "status",
56    "total",
57    "type",
58    "updated",
59    "zip",
60];
61
62#[derive(Debug, Clone)]
63pub struct CsvLayout {
64    pub headers: Vec<String>,
65    pub has_headers: bool,
66}
67
68impl CsvLayout {
69    pub fn field_count(&self) -> usize {
70        self.headers.len()
71    }
72}
73
74#[derive(Debug, Clone, Default)]
75pub enum PlaceholderPolicy {
76    #[default]
77    TreatAsEmpty,
78    FillWith(String),
79}
80
81#[derive(Debug, Clone, Default)]
82pub struct PlaceholderSummary {
83    counts: BTreeMap<String, usize>,
84}
85
86impl PlaceholderSummary {
87    pub fn is_empty(&self) -> bool {
88        self.counts.is_empty()
89    }
90
91    pub fn record(&mut self, value: &str) {
92        let trimmed = value.trim();
93        if trimmed.is_empty() {
94            return;
95        }
96        *self.counts.entry(trimmed.to_string()).or_insert(0) += 1;
97    }
98
99    pub fn entries(&self) -> Vec<(String, usize)> {
100        self.counts
101            .iter()
102            .map(|(token, count)| (token.clone(), *count))
103            .collect()
104    }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
108pub struct DecimalSpec {
109    pub precision: u32,
110    pub scale: u32,
111}
112
113impl DecimalSpec {
114    pub fn new(precision: u32, scale: u32) -> Result<Self> {
115        let spec = Self { precision, scale };
116        spec.ensure_valid()?;
117        Ok(spec)
118    }
119
120    pub fn ensure_valid(&self) -> Result<()> {
121        ensure!(self.precision > 0, "Decimal precision must be positive");
122        ensure!(
123            self.precision <= DECIMAL_MAX_PRECISION,
124            "Decimal precision must be <= {}",
125            DECIMAL_MAX_PRECISION
126        );
127        ensure!(
128            self.scale <= self.precision,
129            "Decimal scale ({}) cannot exceed precision ({})",
130            self.scale,
131            self.precision
132        );
133        ensure!(
134            self.scale <= DECIMAL_MAX_PRECISION,
135            "Decimal scale must be <= {}",
136            DECIMAL_MAX_PRECISION
137        );
138        Ok(())
139    }
140
141    pub fn signature(&self) -> String {
142        format!("decimal({},{})", self.precision, self.scale)
143    }
144
145    pub fn describe(&self) -> String {
146        format!("decimal(precision={},scale={})", self.precision, self.scale)
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub enum ColumnType {
152    String,
153    Integer,
154    Float,
155    Boolean,
156    Date,
157    DateTime,
158    Time,
159    Guid,
160    Currency,
161    Decimal(DecimalSpec),
162}
163
164impl Serialize for ColumnType {
165    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
166    where
167        S: Serializer,
168    {
169        match self {
170            ColumnType::String => serializer.serialize_str("String"),
171            ColumnType::Integer => serializer.serialize_str("Integer"),
172            ColumnType::Float => serializer.serialize_str("Float"),
173            ColumnType::Boolean => serializer.serialize_str("Boolean"),
174            ColumnType::Date => serializer.serialize_str("Date"),
175            ColumnType::DateTime => serializer.serialize_str("DateTime"),
176            ColumnType::Time => serializer.serialize_str("Time"),
177            ColumnType::Guid => serializer.serialize_str("Guid"),
178            ColumnType::Currency => serializer.serialize_str("Currency"),
179            ColumnType::Decimal(spec) => serializer.serialize_str(&spec.signature()),
180        }
181    }
182}
183
184impl<'de> Deserialize<'de> for ColumnType {
185    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
186    where
187        D: Deserializer<'de>,
188    {
189        let human_readable = deserializer.is_human_readable();
190        #[cfg(test)]
191        {
192            if !human_readable && std::env::var("CSV_MANAGED_DEBUG_COLUMN_TYPE").is_ok() {
193                eprintln!("ColumnType binary deserialize invoked");
194            }
195        }
196        if human_readable {
197            let value = serde_yaml::Value::deserialize(deserializer)?;
198            parse_human_readable_column_type(value).map_err(de::Error::custom)
199        } else {
200            let token = String::deserialize(deserializer)?;
201            ColumnType::from_str(&token).map_err(|err| de::Error::custom(err.to_string()))
202        }
203    }
204}
205
206fn parse_decimal_from_mapping(value: serde_yaml::Value) -> Result<ColumnType> {
207    let mapping = value
208        .as_mapping()
209        .ok_or_else(|| anyhow!("Decimal mapping must be a map with precision/scale"))?;
210
211    let mut precision: Option<u32> = None;
212    let mut scale: Option<u32> = None;
213
214    for (key, val) in mapping {
215        let key_str = key
216            .as_str()
217            .ok_or_else(|| anyhow!("Decimal mapping keys must be strings"))?
218            .to_ascii_lowercase();
219
220        match key_str.as_str() {
221            "precision" => {
222                let parsed = val
223                    .as_u64()
224                    .ok_or_else(|| anyhow!("Decimal precision must be an unsigned integer"))?;
225                precision = Some(parsed as u32);
226            }
227            "scale" => {
228                let parsed = val
229                    .as_u64()
230                    .ok_or_else(|| anyhow!("Decimal scale must be an unsigned integer"))?;
231                scale = Some(parsed as u32);
232            }
233            other => {
234                return Err(anyhow!("Unknown decimal key '{other}'"));
235            }
236        }
237    }
238
239    let precision = precision.ok_or_else(|| anyhow!("Decimal mapping requires precision"))?;
240    let scale = scale.ok_or_else(|| anyhow!("Decimal mapping requires scale"))?;
241    let spec = DecimalSpec::new(precision, scale)?;
242    Ok(ColumnType::Decimal(spec))
243}
244
245fn parse_human_readable_column_type(value: serde_yaml::Value) -> Result<ColumnType> {
246    if let Some(token) = value.as_str() {
247        return ColumnType::from_str(token);
248    }
249
250    if let Some(mapping) = value.as_mapping()
251        && mapping.len() == 1
252        && let Some((key, val)) = mapping.iter().next()
253    {
254        let key_normalized = key
255            .as_str()
256            .ok_or_else(|| anyhow!("Structured datatype key must be a string"))?
257            .trim()
258            .to_ascii_lowercase();
259        return match key_normalized.as_str() {
260            "decimal" => parse_decimal_from_mapping(val.clone()),
261            other => Err(anyhow!("Unsupported structured datatype '{other}'")),
262        };
263    }
264
265    Err(anyhow!(
266        "Unsupported column datatype representation: {value:?}"
267    ))
268}
269
270impl ColumnType {
271    pub fn as_str(&self) -> &'static str {
272        match self {
273            ColumnType::String => "string",
274            ColumnType::Integer => "integer",
275            ColumnType::Float => "float",
276            ColumnType::Boolean => "boolean",
277            ColumnType::Date => "date",
278            ColumnType::DateTime => "datetime",
279            ColumnType::Time => "time",
280            ColumnType::Guid => "guid",
281            ColumnType::Currency => "currency",
282            ColumnType::Decimal(_) => "decimal",
283        }
284    }
285
286    pub fn variants() -> &'static [&'static str] {
287        &[
288            "string",
289            "integer",
290            "float",
291            "boolean",
292            "date",
293            "datetime",
294            "time",
295            "guid",
296            "currency",
297            "decimal(precision,scale)",
298        ]
299    }
300
301    pub fn describe(&self) -> String {
302        match self {
303            ColumnType::Decimal(spec) => spec.describe(),
304            _ => self.as_str().to_string(),
305        }
306    }
307
308    pub fn signature_token(&self) -> String {
309        match self {
310            ColumnType::Decimal(spec) => spec.signature(),
311            _ => self.as_str().to_string(),
312        }
313    }
314
315    pub fn cli_token(&self) -> String {
316        match self {
317            ColumnType::Decimal(spec) => format!("decimal({},{})", spec.precision, spec.scale),
318            _ => self.as_str().to_string(),
319        }
320    }
321
322    pub fn decimal_spec(&self) -> Option<&DecimalSpec> {
323        match self {
324            ColumnType::Decimal(spec) => Some(spec),
325            _ => None,
326        }
327    }
328}
329
330impl fmt::Display for ColumnType {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        write!(f, "{}", self.describe())
333    }
334}
335
336impl std::str::FromStr for ColumnType {
337    type Err = anyhow::Error;
338
339    fn from_str(value: &str) -> Result<Self, Self::Err> {
340        let normalized = value.trim().to_ascii_lowercase();
341        match normalized.as_str() {
342            "string" => Ok(ColumnType::String),
343            "integer" | "int" => Ok(ColumnType::Integer),
344            "float" | "double" => Ok(ColumnType::Float),
345            "boolean" | "bool" => Ok(ColumnType::Boolean),
346            "date" => Ok(ColumnType::Date),
347            "datetime" | "date-time" | "timestamp" => Ok(ColumnType::DateTime),
348            "time" => Ok(ColumnType::Time),
349            "guid" | "uuid" => Ok(ColumnType::Guid),
350            "currency" => Ok(ColumnType::Currency),
351            other if other.starts_with("decimal") => parse_decimal_type(value),
352            _ => Err(anyhow!(
353                "Unknown column type '{value}'. Supported types: {}",
354                ColumnType::variants().join(", ")
355            )),
356        }
357    }
358}
359
360fn parse_decimal_type(value: &str) -> Result<ColumnType> {
361    let trimmed = value.trim();
362    let start = trimmed.find('(').ok_or_else(|| {
363        anyhow!("Decimal type must specify precision and scale, e.g. decimal(18,4)")
364    })?;
365    ensure!(
366        trimmed.ends_with(')'),
367        "Decimal type must close with ')', e.g. decimal(18,4)"
368    );
369    let inner = &trimmed[start + 1..trimmed.len() - 1];
370    let mut precision: Option<u32> = None;
371    let mut scale: Option<u32> = None;
372    let mut positional = Vec::new();
373
374    for part in inner.split(',') {
375        let token = part.trim();
376        if token.is_empty() {
377            continue;
378        }
379        if let Some((key, value)) = token
380            .split_once(['=', ':'])
381            .map(|(k, v)| (k.trim(), v.trim()))
382        {
383            let key_normalized = key.to_ascii_lowercase();
384            let parsed: u32 = value
385                .parse()
386                .with_context(|| format!("Parsing decimal {key}='{value}' in '{token}'"))?;
387            match key_normalized.as_str() {
388                "precision" => {
389                    precision = Some(parsed);
390                }
391                "scale" => {
392                    scale = Some(parsed);
393                }
394                other => {
395                    bail!("Unknown decimal option '{other}' in '{token}'");
396                }
397            }
398        } else {
399            positional.push(token);
400        }
401    }
402
403    if let Some(first) = positional.first()
404        && precision.is_none()
405    {
406        precision =
407            Some(first.parse().with_context(|| {
408                format!("Parsing decimal precision from '{first}' in '{value}'")
409            })?);
410    }
411    if let Some(second) = positional.get(1)
412        && scale.is_none()
413    {
414        scale = Some(
415            second
416                .parse()
417                .with_context(|| format!("Parsing decimal scale from '{second}' in '{value}'"))?,
418        );
419    }
420    ensure!(
421        positional.len() <= 2,
422        "Decimal type accepts at most two positional arguments"
423    );
424
425    let precision = precision
426        .ok_or_else(|| anyhow!("Decimal type requires a precision value, e.g. decimal(18,4)"))?;
427    let scale =
428        scale.ok_or_else(|| anyhow!("Decimal type requires a scale value, e.g. decimal(18,4)"))?;
429
430    let spec = DecimalSpec::new(precision, scale)?;
431    Ok(ColumnType::Decimal(spec))
432}
433
434#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
435pub struct ValueReplacement {
436    pub from: String,
437    pub to: String,
438}
439
440#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
441pub struct DatatypeMapping {
442    pub from: ColumnType,
443    pub to: ColumnType,
444    #[serde(default, skip_serializing_if = "Option::is_none")]
445    pub strategy: Option<String>,
446    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
447    pub options: BTreeMap<String, Value>,
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
451pub struct ColumnMeta {
452    pub name: String,
453    pub datatype: ColumnType,
454    #[serde(
455        default,
456        skip_serializing_if = "Option::is_none",
457        rename = "name_mapping"
458    )]
459    pub rename: Option<String>,
460    #[serde(
461        default,
462        rename = "replace",
463        alias = "value_replacements",
464        skip_serializing_if = "Vec::is_empty"
465    )]
466    pub value_replacements: Vec<ValueReplacement>,
467    #[serde(
468        default,
469        rename = "datatype_mappings",
470        skip_serializing_if = "Vec::is_empty"
471    )]
472    pub datatype_mappings: Vec<DatatypeMapping>,
473}
474
475#[derive(Debug, Clone, Serialize, Deserialize)]
476pub struct Schema {
477    pub columns: Vec<ColumnMeta>,
478    #[serde(default, skip_serializing_if = "Option::is_none")]
479    pub schema_version: Option<String>,
480    #[serde(default = "Schema::default_has_headers")]
481    pub has_headers: bool,
482}
483
484#[derive(Debug, Clone)]
485pub struct ColumnSummary {
486    pub non_empty: usize,
487    pub tracked_values: Vec<(String, usize)>,
488    pub other_values: usize,
489}
490
491#[derive(Debug, Clone)]
492pub struct InferenceStats {
493    sample_values: Vec<Option<String>>,
494    rows_read: usize,
495    requested_rows: usize,
496    decode_errors: usize,
497    summaries: Vec<ColumnSummary>,
498    placeholder_summaries: Vec<PlaceholderSummary>,
499}
500
501impl InferenceStats {
502    pub fn sample_value(&self, index: usize) -> Option<&str> {
503        self.sample_values
504            .get(index)
505            .and_then(|value| value.as_deref())
506    }
507
508    pub fn summary(&self, index: usize) -> Option<&ColumnSummary> {
509        self.summaries.get(index)
510    }
511
512    pub fn rows_read(&self) -> usize {
513        self.rows_read
514    }
515
516    pub fn requested_rows(&self) -> usize {
517        self.requested_rows
518    }
519
520    pub fn decode_errors(&self) -> usize {
521        self.decode_errors
522    }
523
524    pub fn placeholder_summary(&self, index: usize) -> Option<&PlaceholderSummary> {
525        self.placeholder_summaries.get(index)
526    }
527}
528
529impl Schema {
530    pub fn from_headers(headers: &[String]) -> Self {
531        let columns = headers
532            .iter()
533            .map(|name| ColumnMeta {
534                name: name.clone(),
535                datatype: ColumnType::String,
536                rename: None,
537                value_replacements: Vec::new(),
538                datatype_mappings: Vec::new(),
539            })
540            .collect();
541        Schema {
542            columns,
543            schema_version: None,
544            has_headers: true,
545        }
546    }
547
548    pub const fn default_has_headers() -> bool {
549        true
550    }
551
552    pub fn expects_headers(&self) -> bool {
553        self.has_headers
554    }
555
556    pub fn column_index(&self, name: &str) -> Option<usize> {
557        self.columns
558            .iter()
559            .position(|c| c.name == name || c.rename.as_deref() == Some(name))
560    }
561
562    pub fn headers(&self) -> Vec<String> {
563        self.columns.iter().map(|c| c.name.clone()).collect()
564    }
565
566    pub fn output_headers(&self) -> Vec<String> {
567        self.columns
568            .iter()
569            .map(|c| c.output_name().to_string())
570            .collect()
571    }
572
573    pub(crate) fn header_alias_sets(&self) -> Vec<HashSet<String>> {
574        self.columns
575            .iter()
576            .map(|column| build_header_aliases(&column.name))
577            .collect()
578    }
579
580    pub fn validate_headers(&self, headers: &[String]) -> Result<()> {
581        if !self.has_headers {
582            return Ok(());
583        }
584        if headers.len() != self.columns.len() {
585            return Err(anyhow!(
586                "Header length mismatch: schema expects {} column(s) but file contains {}",
587                self.columns.len(),
588                headers.len()
589            ));
590        }
591        for (idx, column) in self.columns.iter().enumerate() {
592            let name = headers.get(idx).map(|s| s.as_str()).unwrap_or_default();
593            if column.matches_header(name) {
594                continue;
595            }
596            if let Some(mapped) = column
597                .rename
598                .as_deref()
599                .filter(|value| !value.is_empty() && *value != column.name)
600            {
601                return Err(anyhow!(
602                    "Header mismatch at position {}: expected '{}' (or mapped '{}') but found '{}'",
603                    idx + 1,
604                    column.name,
605                    mapped,
606                    name
607                ));
608            }
609            return Err(anyhow!(
610                "Header mismatch at position {}: expected '{}' but found '{}'",
611                idx + 1,
612                column.name,
613                name
614            ));
615        }
616        Ok(())
617    }
618
619    pub fn save(&self, path: &Path) -> Result<()> {
620        self.save_internal(path, false)
621    }
622
623    pub fn save_with_replace_template(&self, path: &Path) -> Result<()> {
624        self.save_internal(path, true)
625    }
626
627    pub fn to_yaml_string(&self, include_replace_template: bool) -> Result<String> {
628        let value = self.to_yaml_value(include_replace_template)?;
629        serde_yaml::to_string(&value).context("Serializing schema to YAML string")
630    }
631
632    pub fn load(path: &Path) -> Result<Self> {
633        let file = File::open(path).with_context(|| format!("Opening schema file {path:?}"))?;
634        let reader = BufReader::new(file);
635        let schema: Schema = serde_yaml::from_reader(reader).context("Parsing schema YAML")?;
636        schema.validate_datatype_mappings()?;
637        Ok(schema)
638    }
639
640    fn save_internal(&self, path: &Path, include_replace_template: bool) -> Result<()> {
641        let value = self.to_yaml_value(include_replace_template)?;
642        let file = File::create(path).with_context(|| format!("Creating schema file {path:?}"))?;
643        serde_yaml::to_writer(file, &value).context("Writing schema YAML")
644    }
645
646    fn to_yaml_value(&self, include_replace_template: bool) -> Result<Value> {
647        let mut schema = self.clone();
648        if schema.schema_version.is_none() {
649            schema.schema_version = Some(CURRENT_SCHEMA_VERSION.to_string());
650        }
651        schema.validate_datatype_mappings()?;
652
653        let mut value =
654            serde_yaml::to_value(&schema).context("Serializing schema to YAML value")?;
655        if include_replace_template
656            && let Some(columns) = value
657                .get_mut("columns")
658                .and_then(|columns| columns.as_sequence_mut())
659        {
660            for column in columns {
661                if let Some(obj) = column.as_mapping_mut() {
662                    if let Some(existing) = obj.remove(Value::from("value_replacements")) {
663                        obj.insert(Value::from("replace"), existing);
664                    }
665                    let replace_key = Value::from("replace");
666                    if !obj.contains_key(&replace_key) {
667                        obj.insert(replace_key, Value::Sequence(Vec::new()));
668                    }
669                }
670            }
671        }
672        Ok(value)
673    }
674}
675
676fn parse_initial_value(raw: &str, mapping: &DatatypeMapping) -> Result<DataValue> {
677    match mapping.from {
678        ColumnType::String => Ok(DataValue::String(raw.to_string())),
679        _ => parse_with_type(raw, &mapping.from),
680    }
681}
682
683fn parse_with_type(value: &str, ty: &ColumnType) -> Result<DataValue> {
684    let trimmed = value.trim();
685    parse_typed_value(trimmed, ty)
686        .with_context(|| format!("Parsing '{trimmed}' as {ty}"))?
687        .ok_or_else(|| anyhow!("Value is empty after trimming"))
688}
689
690fn value_column_type(value: &DataValue) -> ColumnType {
691    match value {
692        DataValue::String(_) => ColumnType::String,
693        DataValue::Integer(_) => ColumnType::Integer,
694        DataValue::Float(_) => ColumnType::Float,
695        DataValue::Boolean(_) => ColumnType::Boolean,
696        DataValue::Date(_) => ColumnType::Date,
697        DataValue::DateTime(_) => ColumnType::DateTime,
698        DataValue::Time(_) => ColumnType::Time,
699        DataValue::Guid(_) => ColumnType::Guid,
700        DataValue::Decimal(value) => ColumnType::Decimal(
701            DecimalSpec::new(value.precision(), value.scale())
702                .expect("FixedDecimalValue guarantees valid decimal spec"),
703        ),
704        DataValue::Currency(_) => ColumnType::Currency,
705    }
706}
707
708fn apply_single_mapping(mapping: &DatatypeMapping, value: DataValue) -> Result<DataValue> {
709    let strategy = normalized_strategy(mapping);
710    match (&mapping.to, value) {
711        (ColumnType::String, DataValue::String(mut s)) => {
712            if let Some(strategy) = strategy.as_deref() {
713                match strategy {
714                    "trim" => s = s.trim().to_string(),
715                    "lowercase" => s = s.to_ascii_lowercase(),
716                    "uppercase" => s = s.to_ascii_uppercase(),
717                    other => {
718                        bail!("Strategy '{other}' is not valid for string -> string mappings");
719                    }
720                }
721            }
722            Ok(DataValue::String(s))
723        }
724        (ColumnType::String, DataValue::Integer(i)) => Ok(DataValue::String(i.to_string())),
725        (ColumnType::String, DataValue::Float(f)) => {
726            let scale = resolve_scale(mapping);
727            let formatted =
728                if strategy.as_deref() == Some("round") || mapping.from == ColumnType::Float {
729                    format_float_with_scale(f, scale)
730                } else {
731                    f.to_string()
732                };
733            Ok(DataValue::String(formatted))
734        }
735        (ColumnType::String, DataValue::Boolean(b)) => Ok(DataValue::String(b.to_string())),
736        (ColumnType::String, DataValue::Date(d)) => {
737            let fmt = mapping
738                .options
739                .get("format")
740                .and_then(|v| v.as_str())
741                .unwrap_or("%Y-%m-%d");
742            Ok(DataValue::String(d.format(fmt).to_string()))
743        }
744        (ColumnType::String, DataValue::DateTime(dt)) => {
745            let fmt = mapping
746                .options
747                .get("format")
748                .and_then(|v| v.as_str())
749                .unwrap_or("%Y-%m-%d %H:%M:%S");
750            Ok(DataValue::String(dt.format(fmt).to_string()))
751        }
752        (ColumnType::String, DataValue::Time(t)) => {
753            let fmt = mapping
754                .options
755                .get("format")
756                .and_then(|v| v.as_str())
757                .unwrap_or("%H:%M:%S");
758            Ok(DataValue::String(t.format(fmt).to_string()))
759        }
760        (ColumnType::String, DataValue::Guid(g)) => Ok(DataValue::String(g.to_string())),
761        (ColumnType::String, DataValue::Decimal(d)) => Ok(DataValue::String(d.to_string_fixed())),
762        (ColumnType::String, DataValue::Currency(c)) => Ok(DataValue::String(c.to_string_fixed())),
763        (ColumnType::Integer, DataValue::String(s)) => {
764            let parsed = parse_with_type(&s, &ColumnType::Integer)?;
765            if let DataValue::Integer(i) = parsed {
766                Ok(DataValue::Integer(i))
767            } else {
768                unreachable!()
769            }
770        }
771        (ColumnType::Float, DataValue::String(s)) => {
772            let parsed = parse_with_type(&s, &ColumnType::Float)?;
773            let mut value = match parsed {
774                DataValue::Float(f) => f,
775                _ => unreachable!(),
776            };
777            if should_round_float(mapping, strategy.as_deref()) {
778                value = round_float(value, resolve_scale(mapping));
779            }
780            Ok(DataValue::Float(value))
781        }
782        (ColumnType::Currency, DataValue::String(s)) => {
783            let decimal = parse_currency_decimal(&s)?;
784            let scale = explicit_currency_scale(mapping)?
785                .unwrap_or_else(|| default_currency_scale(&decimal));
786            let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
787            Ok(DataValue::Currency(currency))
788        }
789        (ColumnType::Decimal(spec), DataValue::String(s)) => {
790            let decimal = parse_decimal_literal(&s)?;
791            let fixed = FixedDecimalValue::from_decimal(decimal, spec, strategy.as_deref())?;
792            Ok(DataValue::Decimal(fixed))
793        }
794        (ColumnType::Boolean, DataValue::String(s)) => {
795            let parsed = parse_with_type(&s, &ColumnType::Boolean)?;
796            if let DataValue::Boolean(b) = parsed {
797                Ok(DataValue::Boolean(b))
798            } else {
799                unreachable!()
800            }
801        }
802        (ColumnType::Date, DataValue::String(s)) => {
803            let parsed = parse_string_to_date(&s, mapping)?;
804            Ok(DataValue::Date(parsed))
805        }
806        (ColumnType::DateTime, DataValue::String(s)) => {
807            let parsed = parse_string_to_datetime(&s, mapping)?;
808            Ok(DataValue::DateTime(parsed))
809        }
810        (ColumnType::Time, DataValue::String(s)) => {
811            let parsed = parse_string_to_time(&s, mapping)?;
812            Ok(DataValue::Time(parsed))
813        }
814        (ColumnType::Guid, DataValue::String(s)) => {
815            let parsed = parse_with_type(&s, &ColumnType::Guid)?;
816            if let DataValue::Guid(g) = parsed {
817                Ok(DataValue::Guid(g))
818            } else {
819                unreachable!()
820            }
821        }
822        (ColumnType::Date, DataValue::DateTime(dt)) => Ok(DataValue::Date(dt.date())),
823        (ColumnType::Time, DataValue::DateTime(dt)) => Ok(DataValue::Time(dt.time())),
824        (ColumnType::Float, DataValue::Integer(i)) => {
825            let mut value = i as f64;
826            if should_round_float(mapping, strategy.as_deref()) {
827                value = round_float(value, resolve_scale(mapping));
828            }
829            Ok(DataValue::Float(value))
830        }
831        (ColumnType::Currency, DataValue::Integer(i)) => {
832            let decimal = Decimal::from(i);
833            let scale = explicit_currency_scale(mapping)?
834                .unwrap_or_else(|| default_currency_scale(&decimal));
835            let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
836            Ok(DataValue::Currency(currency))
837        }
838        (ColumnType::Decimal(spec), DataValue::Integer(i)) => {
839            let decimal = Decimal::from(i);
840            let fixed = FixedDecimalValue::from_decimal(decimal, spec, strategy.as_deref())?;
841            Ok(DataValue::Decimal(fixed))
842        }
843        (ColumnType::Integer, DataValue::Float(f)) => {
844            let rounded = match strategy.as_deref() {
845                Some("truncate") => f.trunc() as i64,
846                _ => f.round() as i64,
847            };
848            Ok(DataValue::Integer(rounded))
849        }
850        (ColumnType::Float, DataValue::Float(f)) => {
851            let mut value = f;
852            if should_round_float(mapping, strategy.as_deref()) {
853                value = round_float(value, resolve_scale(mapping));
854            }
855            Ok(DataValue::Float(value))
856        }
857        (ColumnType::Currency, DataValue::Float(f)) => {
858            let decimal = Decimal::from_f64(f)
859                .ok_or_else(|| anyhow!("Failed to convert float {f} to decimal"))?;
860            let scale = explicit_currency_scale(mapping)?
861                .unwrap_or_else(|| default_currency_scale(&decimal));
862            let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
863            Ok(DataValue::Currency(currency))
864        }
865        (ColumnType::Decimal(spec), DataValue::Float(f)) => {
866            let decimal = Decimal::from_f64(f)
867                .ok_or_else(|| anyhow!("Failed to convert float {f} to decimal"))?;
868            let fixed = FixedDecimalValue::from_decimal(decimal, spec, strategy.as_deref())?;
869            Ok(DataValue::Decimal(fixed))
870        }
871        (ColumnType::Float, DataValue::Currency(c)) => {
872            let value = c
873                .to_f64()
874                .ok_or_else(|| anyhow!("Currency value out of f64 range"))?;
875            Ok(DataValue::Float(value))
876        }
877        (ColumnType::Integer, DataValue::Currency(c)) => {
878            let f = c
879                .to_f64()
880                .ok_or_else(|| anyhow!("Currency value out of range for integer conversion"))?;
881            let rounded = match strategy.as_deref() {
882                Some("truncate") => f.trunc() as i64,
883                _ => f.round() as i64,
884            };
885            Ok(DataValue::Integer(rounded))
886        }
887        (ColumnType::Currency, DataValue::Currency(c)) => {
888            let decimal = *c.amount();
889            let scale = explicit_currency_scale(mapping)?
890                .unwrap_or_else(|| default_currency_scale(&decimal));
891            let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
892            Ok(DataValue::Currency(currency))
893        }
894        (ColumnType::Decimal(spec), DataValue::Currency(c)) => {
895            let fixed = FixedDecimalValue::from_decimal(*c.amount(), spec, strategy.as_deref())?;
896            Ok(DataValue::Decimal(fixed))
897        }
898        (ColumnType::Float, DataValue::Decimal(d)) => {
899            let value = d
900                .to_f64()
901                .ok_or_else(|| anyhow!("Decimal value out of f64 range"))?;
902            Ok(DataValue::Float(value))
903        }
904        (ColumnType::Integer, DataValue::Decimal(d)) => {
905            let value = d
906                .to_f64()
907                .ok_or_else(|| anyhow!("Decimal value out of range for integer conversion"))?;
908            let rounded = match strategy.as_deref() {
909                Some("truncate") => value.trunc() as i64,
910                _ => value.round() as i64,
911            };
912            Ok(DataValue::Integer(rounded))
913        }
914        (ColumnType::Currency, DataValue::Decimal(d)) => {
915            let decimal = *d.amount();
916            let scale = explicit_currency_scale(mapping)?
917                .unwrap_or_else(|| default_currency_scale(&decimal));
918            let currency = CurrencyValue::quantize(decimal, scale, strategy.as_deref())?;
919            Ok(DataValue::Currency(currency))
920        }
921        (ColumnType::Decimal(spec), DataValue::Decimal(existing)) => {
922            if existing.precision() == spec.precision && existing.scale() == spec.scale {
923                Ok(DataValue::Decimal(existing))
924            } else {
925                let fixed =
926                    FixedDecimalValue::from_decimal(*existing.amount(), spec, strategy.as_deref())?;
927                Ok(DataValue::Decimal(fixed))
928            }
929        }
930        (ColumnType::Integer, DataValue::Integer(i)) => Ok(DataValue::Integer(i)),
931        _ => bail!(
932            "Datatype mapping '{}' -> '{}' is not supported",
933            mapping.from,
934            mapping.to
935        ),
936    }
937}
938
939fn render_mapped_value(value: &DataValue, mapping: &DatatypeMapping) -> Result<String> {
940    match (&mapping.to, value) {
941        (ColumnType::String, DataValue::String(s)) => Ok(s.clone()),
942        (ColumnType::Integer, DataValue::Integer(i)) => Ok(i.to_string()),
943        (ColumnType::Float, DataValue::Float(f)) => {
944            let scale = resolve_scale(mapping);
945            Ok(format_float_with_scale(*f, scale))
946        }
947        (ColumnType::Boolean, DataValue::Boolean(b)) => Ok(b.to_string()),
948        (ColumnType::Date, DataValue::Date(d)) => {
949            let fmt = mapping
950                .options
951                .get("format")
952                .and_then(|v| v.as_str())
953                .unwrap_or("%Y-%m-%d");
954            Ok(d.format(fmt).to_string())
955        }
956        (ColumnType::DateTime, DataValue::DateTime(dt)) => {
957            let fmt = mapping
958                .options
959                .get("format")
960                .and_then(|v| v.as_str())
961                .unwrap_or("%Y-%m-%d %H:%M:%S");
962            Ok(dt.format(fmt).to_string())
963        }
964        (ColumnType::Time, DataValue::Time(t)) => {
965            let fmt = mapping
966                .options
967                .get("format")
968                .and_then(|v| v.as_str())
969                .unwrap_or("%H:%M:%S");
970            Ok(t.format(fmt).to_string())
971        }
972        (ColumnType::Guid, DataValue::Guid(g)) => Ok(g.to_string()),
973        (ColumnType::Currency, DataValue::Currency(c)) => Ok(c.to_string_fixed()),
974        (ColumnType::Decimal(spec), DataValue::Decimal(d)) => {
975            if d.scale() == spec.scale && d.precision() == spec.precision {
976                Ok(d.to_string_fixed())
977            } else {
978                let fixed = FixedDecimalValue::from_decimal(*d.amount(), spec, None)?;
979                Ok(fixed.to_string_fixed())
980            }
981        }
982        _ => bail!(
983            "Mapping output type '{:?}' is incompatible with computed value '{:?}'",
984            mapping.to,
985            value_column_type(value)
986        ),
987    }
988}
989
990fn format_float_with_scale(value: f64, scale: usize) -> String {
991    if scale == 0 {
992        format!("{value:.0}")
993    } else {
994        format!("{:.precision$}", value, precision = scale)
995    }
996}
997
998fn should_round_float(mapping: &DatatypeMapping, strategy: Option<&str>) -> bool {
999    match strategy {
1000        Some("round") => true,
1001        Some(_) => false,
1002        None => mapping.from == ColumnType::Float && mapping.to == ColumnType::Float,
1003    }
1004}
1005
1006fn round_float(value: f64, scale: usize) -> f64 {
1007    if scale == 0 {
1008        value.round()
1009    } else {
1010        let factor = 10f64.powi(scale as i32);
1011        (value * factor).round() / factor
1012    }
1013}
1014
1015fn resolve_scale(mapping: &DatatypeMapping) -> usize {
1016    mapping
1017        .options
1018        .get("scale")
1019        .and_then(|value| {
1020            value
1021                .as_u64()
1022                .map(|u| u as usize)
1023                .or_else(|| value.as_i64().map(|i| i.max(0) as usize))
1024        })
1025        .unwrap_or(4)
1026}
1027
1028fn explicit_currency_scale(mapping: &DatatypeMapping) -> Result<Option<u32>> {
1029    if let Some(scale) = mapping.options.get("scale") {
1030        let numeric = if let Some(value) = scale.as_u64() {
1031            value
1032        } else if let Some(value) = scale.as_i64() {
1033            ensure!(value >= 0, "Currency scale must be non-negative");
1034            value as u64
1035        } else {
1036            bail!("Currency scale must be numeric");
1037        };
1038        let scale_u32 = numeric as u32;
1039        ensure!(
1040            crate::data::CURRENCY_ALLOWED_SCALES.contains(&scale_u32),
1041            "Currency scale must be 2 or 4"
1042        );
1043        Ok(Some(scale_u32))
1044    } else {
1045        Ok(None)
1046    }
1047}
1048
1049fn default_currency_scale(decimal: &Decimal) -> u32 {
1050    let scale = decimal.scale();
1051    if scale == 0 {
1052        2
1053    } else if crate::data::CURRENCY_ALLOWED_SCALES.contains(&scale) {
1054        scale
1055    } else if scale > 4 {
1056        4
1057    } else {
1058        2
1059    }
1060}
1061
1062fn parse_string_to_date(value: &str, mapping: &DatatypeMapping) -> Result<NaiveDate> {
1063    let trimmed = value.trim();
1064    if let Some(fmt) = mapping.options.get("format").and_then(|v| v.as_str()) {
1065        NaiveDate::parse_from_str(trimmed, fmt)
1066            .with_context(|| format!("Parsing '{trimmed}' with format '{fmt}'"))
1067    } else {
1068        parse_naive_date(trimmed)
1069    }
1070}
1071
1072fn parse_string_to_datetime(value: &str, mapping: &DatatypeMapping) -> Result<NaiveDateTime> {
1073    let trimmed = value.trim();
1074    if let Some(fmt) = mapping.options.get("format").and_then(|v| v.as_str()) {
1075        NaiveDateTime::parse_from_str(trimmed, fmt)
1076            .with_context(|| format!("Parsing '{trimmed}' with format '{fmt}'"))
1077    } else {
1078        parse_naive_datetime(trimmed)
1079    }
1080}
1081
1082fn parse_string_to_time(value: &str, mapping: &DatatypeMapping) -> Result<NaiveTime> {
1083    let trimmed = value.trim();
1084    if let Some(fmt) = mapping.options.get("format").and_then(|v| v.as_str()) {
1085        NaiveTime::parse_from_str(trimmed, fmt)
1086            .with_context(|| format!("Parsing '{trimmed}' with format '{fmt}'"))
1087    } else {
1088        parse_naive_time(trimmed)
1089    }
1090}
1091
1092fn normalized_strategy(mapping: &DatatypeMapping) -> Option<String> {
1093    mapping
1094        .strategy
1095        .as_ref()
1096        .map(|s| s.trim().to_ascii_lowercase())
1097        .filter(|s| !s.is_empty())
1098}
1099
1100fn validate_mapping_options(column_name: &str, mapping: &DatatypeMapping) -> Result<()> {
1101    if let Some(strategy_raw) = mapping.strategy.as_ref() {
1102        let strategy = strategy_raw.trim();
1103        if !strategy.is_empty() {
1104            let normalized = strategy.to_ascii_lowercase();
1105            match normalized.as_str() {
1106                "round" | "trim" | "lowercase" | "uppercase" | "truncate" => {}
1107                other => {
1108                    bail!(
1109                        "Column '{}' mapping {} -> {} uses unsupported strategy '{}'",
1110                        column_name,
1111                        mapping.from,
1112                        mapping.to,
1113                        other
1114                    );
1115                }
1116            }
1117            if matches!(normalized.as_str(), "trim" | "lowercase" | "uppercase") {
1118                ensure!(
1119                    mapping.from == ColumnType::String && mapping.to == ColumnType::String,
1120                    "Column '{}' mapping {} -> {} cannot apply '{}' strategy",
1121                    column_name,
1122                    mapping.from,
1123                    mapping.to,
1124                    strategy
1125                );
1126            }
1127            if normalized == "round" {
1128                ensure!(
1129                    matches!(
1130                        mapping.to,
1131                        ColumnType::Float
1132                            | ColumnType::Integer
1133                            | ColumnType::String
1134                            | ColumnType::Currency
1135                            | ColumnType::Decimal(_)
1136                    ),
1137                    "Column '{}' mapping {} -> {} cannot apply 'round' strategy",
1138                    column_name,
1139                    mapping.from,
1140                    mapping.to
1141                );
1142            }
1143            if normalized == "truncate" {
1144                ensure!(
1145                    matches!(
1146                        mapping.to,
1147                        ColumnType::Integer | ColumnType::Currency | ColumnType::Decimal(_)
1148                    ),
1149                    "Column '{}' mapping {} -> {} cannot apply 'truncate' strategy",
1150                    column_name,
1151                    mapping.from,
1152                    mapping.to
1153                );
1154            }
1155        }
1156    }
1157
1158    if let Some(scale) = mapping.options.get("scale") {
1159        let numeric = if let Some(value) = scale.as_u64() {
1160            value
1161        } else if let Some(value) = scale.as_i64() {
1162            ensure!(
1163                value >= 0,
1164                "Column '{}' mapping {} -> {} requires a non-negative scale",
1165                column_name,
1166                mapping.from,
1167                mapping.to
1168            );
1169            value as u64
1170        } else {
1171            bail!(
1172                "Column '{}' mapping {} -> {} requires 'scale' to be a number",
1173                column_name,
1174                mapping.from,
1175                mapping.to
1176            );
1177        };
1178
1179        if mapping.to == ColumnType::Currency {
1180            ensure!(
1181                crate::data::CURRENCY_ALLOWED_SCALES.contains(&(numeric as u32)),
1182                "Column '{}' mapping {} -> {} requires scale to be 2 or 4",
1183                column_name,
1184                mapping.from,
1185                mapping.to
1186            );
1187        }
1188        if matches!(mapping.to, ColumnType::Decimal(_)) {
1189            bail!(
1190                "Column '{}' mapping {} -> {} should define scale via the decimal datatype rather than a mapping option",
1191                column_name,
1192                mapping.from,
1193                mapping.to
1194            );
1195        }
1196    }
1197
1198    if let Some(format_value) = mapping.options.get("format") {
1199        ensure!(
1200            format_value.as_str().is_some(),
1201            "Column '{}' mapping {} -> {} requires 'format' to be a string",
1202            column_name,
1203            mapping.from,
1204            mapping.to
1205        );
1206    }
1207
1208    if mapping.options.contains_key("precision") {
1209        bail!(
1210            "Column '{}' mapping {} -> {} should define precision via the decimal datatype rather than a mapping option",
1211            column_name,
1212            mapping.from,
1213            mapping.to
1214        );
1215    }
1216
1217    Ok(())
1218}
1219
1220#[derive(Debug, Clone)]
1221struct TypeCandidate {
1222    non_empty: usize,
1223    boolean_matches: usize,
1224    integer_matches: usize,
1225    integer_max_digits: u32,
1226    float_matches: usize,
1227    decimal_matches: usize,
1228    decimal_max_precision: u32,
1229    decimal_max_scale: u32,
1230    decimal_max_integer_digits: u32,
1231    decimal_precision_overflow: bool,
1232    date_matches: usize,
1233    datetime_matches: usize,
1234    time_matches: usize,
1235    guid_matches: usize,
1236    currency_matches: usize,
1237    currency_symbol_hits: usize,
1238    unclassified: usize,
1239}
1240
1241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1242enum NumericKind {
1243    Integer,
1244    Decimal,
1245    Float,
1246}
1247
1248#[derive(Debug, Clone, Copy)]
1249struct NumericObservation {
1250    kind: NumericKind,
1251    precision: u32,
1252    scale: u32,
1253    integer_digits: u32,
1254    has_currency_symbol: bool,
1255    fits_currency_scale: bool,
1256    overflow: bool,
1257}
1258
1259impl NumericObservation {
1260    fn integer(integer_digits: u32, has_currency_symbol: bool) -> Self {
1261        Self {
1262            kind: NumericKind::Integer,
1263            precision: integer_digits,
1264            scale: 0,
1265            integer_digits,
1266            has_currency_symbol,
1267            fits_currency_scale: true,
1268            overflow: false,
1269        }
1270    }
1271
1272    fn decimal(
1273        precision: u32,
1274        scale: u32,
1275        integer_digits: u32,
1276        has_currency_symbol: bool,
1277        fits_currency_scale: bool,
1278        overflow: bool,
1279    ) -> Self {
1280        Self {
1281            kind: NumericKind::Decimal,
1282            precision,
1283            scale,
1284            integer_digits,
1285            has_currency_symbol,
1286            fits_currency_scale,
1287            overflow,
1288        }
1289    }
1290
1291    fn float(has_currency_symbol: bool) -> Self {
1292        Self {
1293            kind: NumericKind::Float,
1294            precision: 0,
1295            scale: 0,
1296            integer_digits: 0,
1297            has_currency_symbol,
1298            fits_currency_scale: false,
1299            overflow: false,
1300        }
1301    }
1302}
1303
1304fn analyze_numeric_token(value: &str) -> Option<NumericObservation> {
1305    let trimmed = value.trim();
1306    if trimmed.is_empty() {
1307        return None;
1308    }
1309
1310    let mut body = trimmed;
1311    let mut had_parentheses = false;
1312    if body.starts_with('(') && body.ends_with(')') && body.len() > 2 {
1313        had_parentheses = true;
1314        body = &body[1..body.len() - 1];
1315    }
1316
1317    body = body.trim();
1318    if body.is_empty() {
1319        return None;
1320    }
1321
1322    let mut mantissa = String::with_capacity(body.len());
1323    let mut exponent = String::new();
1324    let mut in_exponent = false;
1325    let mut exponent_sign_allowed = false;
1326    let mut decimal_index: Option<usize> = None;
1327    let mut has_currency_symbol = false;
1328    let mut sign_consumed = had_parentheses;
1329
1330    for ch in body.chars() {
1331        match ch {
1332            '0'..='9' => {
1333                if in_exponent {
1334                    exponent.push(ch);
1335                } else {
1336                    mantissa.push(ch);
1337                }
1338            }
1339            '.' => {
1340                if in_exponent || decimal_index.is_some() {
1341                    return None;
1342                }
1343                decimal_index = Some(mantissa.len());
1344            }
1345            'e' | 'E' => {
1346                if in_exponent {
1347                    return None;
1348                }
1349                in_exponent = true;
1350                exponent_sign_allowed = true;
1351                continue;
1352            }
1353            '+' | '-' => {
1354                if in_exponent && exponent_sign_allowed {
1355                    exponent.push(ch);
1356                    exponent_sign_allowed = false;
1357                } else if !in_exponent && mantissa.is_empty() && !sign_consumed {
1358                    sign_consumed = true;
1359                } else {
1360                    return None;
1361                }
1362            }
1363            ',' | '_' | ' ' => {
1364                continue;
1365            }
1366            '$' | '€' | '£' | '¥' => {
1367                has_currency_symbol = true;
1368                continue;
1369            }
1370            _ => {
1371                return None;
1372            }
1373        }
1374        if ch != '+' && ch != '-' {
1375            exponent_sign_allowed = false;
1376        }
1377    }
1378
1379    if mantissa.is_empty() {
1380        return None;
1381    }
1382
1383    if decimal_index.is_none()
1384        && !in_exponent
1385        && mantissa.len() > 1
1386        && mantissa.chars().all(|c| c == '0')
1387    {
1388        return None;
1389    }
1390    if decimal_index.is_none() && !in_exponent && mantissa.len() > 1 && mantissa.starts_with('0') {
1391        return None;
1392    }
1393
1394    let mantissa_scale = decimal_index.map(|pos| mantissa.len() - pos).unwrap_or(0);
1395
1396    let exponent_value = if in_exponent {
1397        if exponent.is_empty() || exponent == "+" || exponent == "-" {
1398            return None;
1399        }
1400        match exponent.parse::<i32>() {
1401            Ok(value) => value,
1402            Err(_) => return None,
1403        }
1404    } else {
1405        0
1406    };
1407
1408    let mut digits = mantissa.clone();
1409    let mut scale_i32 = mantissa_scale as i32 - exponent_value;
1410    if scale_i32 < 0 {
1411        let zeros = (-scale_i32) as usize;
1412        digits.push_str(&"0".repeat(zeros));
1413        scale_i32 = 0;
1414    }
1415    let scale = scale_i32.max(0) as u32;
1416    let digits_len = digits.len() as u32;
1417    let integer_digits = digits_len.saturating_sub(scale);
1418
1419    let mut precision = if digits_len == 0 {
1420        0
1421    } else if integer_digits == 0 {
1422        scale.max(1)
1423    } else {
1424        integer_digits + scale
1425    };
1426    if precision == 0 {
1427        precision = 1;
1428    }
1429
1430    let fits_currency_scale = scale == 0 || crate::data::CURRENCY_ALLOWED_SCALES.contains(&scale);
1431    let overflow = precision > DECIMAL_MAX_PRECISION || scale > DECIMAL_MAX_PRECISION;
1432
1433    if in_exponent || decimal_index.is_some() || scale > 0 {
1434        return Some(NumericObservation::decimal(
1435            precision,
1436            scale,
1437            integer_digits,
1438            has_currency_symbol || had_parentheses,
1439            fits_currency_scale,
1440            overflow,
1441        ));
1442    }
1443
1444    if overflow {
1445        return Some(NumericObservation::float(
1446            has_currency_symbol || had_parentheses,
1447        ));
1448    }
1449
1450    Some(NumericObservation::integer(
1451        integer_digits,
1452        has_currency_symbol || had_parentheses,
1453    ))
1454}
1455
1456const CURRENCY_SYMBOL_PROMOTION_THRESHOLD: usize = 30;
1457const SUMMARY_TRACKED_LIMIT: usize = 5;
1458const CURRENT_SCHEMA_VERSION: &str = "1.1.0";
1459
1460#[derive(Clone, Default)]
1461struct SummaryAccumulator {
1462    non_empty: usize,
1463    tracked: Vec<(String, usize)>,
1464    other_values: usize,
1465}
1466
1467impl SummaryAccumulator {
1468    fn record(&mut self, value: &str) {
1469        self.non_empty += 1;
1470        if let Some((_, count)) = self
1471            .tracked
1472            .iter_mut()
1473            .find(|(existing, _)| existing == value)
1474        {
1475            *count += 1;
1476            return;
1477        }
1478        if self.tracked.len() < SUMMARY_TRACKED_LIMIT {
1479            self.tracked.push((value.to_string(), 1));
1480        } else {
1481            self.other_values += 1;
1482        }
1483    }
1484
1485    fn finalize(self) -> ColumnSummary {
1486        ColumnSummary {
1487            non_empty: self.non_empty,
1488            tracked_values: self.tracked,
1489            other_values: self.other_values,
1490        }
1491    }
1492}
1493
1494impl TypeCandidate {
1495    fn new() -> Self {
1496        Self {
1497            non_empty: 0,
1498            boolean_matches: 0,
1499            integer_matches: 0,
1500            integer_max_digits: 0,
1501            float_matches: 0,
1502            decimal_matches: 0,
1503            decimal_max_precision: 0,
1504            decimal_max_scale: 0,
1505            decimal_max_integer_digits: 0,
1506            decimal_precision_overflow: false,
1507            date_matches: 0,
1508            datetime_matches: 0,
1509            time_matches: 0,
1510            guid_matches: 0,
1511            currency_matches: 0,
1512            currency_symbol_hits: 0,
1513            unclassified: 0,
1514        }
1515    }
1516
1517    fn update(&mut self, value: &str) {
1518        let trimmed = value.trim();
1519        if trimmed.is_empty() {
1520            return;
1521        }
1522
1523        let lowered = trimmed.to_ascii_lowercase();
1524        if is_placeholder_token(&lowered) {
1525            return;
1526        }
1527
1528        self.non_empty += 1;
1529        let mut parsed_any = false;
1530
1531        if matches!(
1532            lowered.as_str(),
1533            "true" | "false" | "t" | "f" | "yes" | "no" | "y" | "n"
1534        ) {
1535            self.boolean_matches += 1;
1536            parsed_any = true;
1537        }
1538
1539        if let Some(observation) = analyze_numeric_token(trimmed) {
1540            parsed_any = true;
1541            match observation.kind {
1542                NumericKind::Integer => {
1543                    self.integer_matches += 1;
1544                    self.integer_max_digits =
1545                        self.integer_max_digits.max(observation.integer_digits);
1546                    if observation.fits_currency_scale {
1547                        self.currency_matches += 1;
1548                    }
1549                }
1550                NumericKind::Decimal => {
1551                    self.decimal_matches += 1;
1552                    self.decimal_max_precision =
1553                        self.decimal_max_precision.max(observation.precision);
1554                    self.decimal_max_scale = self.decimal_max_scale.max(observation.scale);
1555                    self.decimal_max_integer_digits = self
1556                        .decimal_max_integer_digits
1557                        .max(observation.integer_digits);
1558                    if observation.fits_currency_scale {
1559                        self.currency_matches += 1;
1560                    }
1561                    if observation.overflow {
1562                        self.decimal_precision_overflow = true;
1563                        self.float_matches += 1;
1564                    }
1565                }
1566                NumericKind::Float => {
1567                    self.float_matches += 1;
1568                }
1569            }
1570            if observation.has_currency_symbol {
1571                self.currency_symbol_hits += 1;
1572            }
1573        }
1574
1575        if !parsed_any && parse_naive_date(trimmed).is_ok() {
1576            self.date_matches += 1;
1577            parsed_any = true;
1578        }
1579        if !parsed_any && parse_naive_datetime(trimmed).is_ok() {
1580            self.datetime_matches += 1;
1581            parsed_any = true;
1582        }
1583        if !parsed_any && parse_naive_time(trimmed).is_ok() {
1584            self.time_matches += 1;
1585            parsed_any = true;
1586        }
1587
1588        let trimmed_guid = trimmed.trim_matches(|c| matches!(c, '{' | '}'));
1589        if !parsed_any && Uuid::parse_str(trimmed_guid).is_ok() {
1590            self.guid_matches += 1;
1591            parsed_any = true;
1592        }
1593
1594        if !parsed_any {
1595            self.unclassified += 1;
1596        }
1597    }
1598
1599    fn majority(&self, count: usize) -> bool {
1600        count > 0 && count * 2 > self.non_empty
1601    }
1602
1603    fn decimal_spec(&self) -> Option<DecimalSpec> {
1604        if self.decimal_matches == 0 {
1605            return None;
1606        }
1607        if self.decimal_precision_overflow {
1608            return None;
1609        }
1610
1611        let scale = self.decimal_max_scale.min(DECIMAL_MAX_PRECISION);
1612        let integer_digits = self.decimal_max_integer_digits.max(self.integer_max_digits);
1613
1614        let mut precision = if integer_digits == 0 {
1615            scale.max(1)
1616        } else {
1617            integer_digits + scale
1618        };
1619        precision = precision.max(self.decimal_max_precision);
1620
1621        if precision > DECIMAL_MAX_PRECISION {
1622            return None;
1623        }
1624
1625        DecimalSpec::new(precision, scale).ok()
1626    }
1627
1628    fn decide(&self) -> ColumnType {
1629        if self.non_empty == 0 {
1630            return ColumnType::String;
1631        }
1632        if self.unclassified > 0 {
1633            return ColumnType::String;
1634        }
1635        let promote_currency = self.should_promote_currency();
1636        if self.majority(self.boolean_matches) {
1637            ColumnType::Boolean
1638        } else if promote_currency {
1639            ColumnType::Currency
1640        } else if let Some(spec) = self.decimal_spec() {
1641            ColumnType::Decimal(spec)
1642        } else if self.decimal_matches > 0 {
1643            ColumnType::Float
1644        } else if self.majority(self.integer_matches) {
1645            ColumnType::Integer
1646        } else if self.majority(self.currency_matches) && self.currency_symbol_hits > 0 {
1647            ColumnType::Currency
1648        } else if self.majority(self.float_matches) {
1649            ColumnType::Float
1650        } else if self.majority(self.date_matches) {
1651            ColumnType::Date
1652        } else if self.majority(self.datetime_matches) {
1653            ColumnType::DateTime
1654        } else if self.majority(self.time_matches) {
1655            ColumnType::Time
1656        } else if self.majority(self.guid_matches) {
1657            ColumnType::Guid
1658        } else {
1659            ColumnType::String
1660        }
1661    }
1662
1663    fn currency_symbol_ratio_meets_threshold(&self) -> bool {
1664        if self.non_empty == 0 {
1665            return false;
1666        }
1667        self.currency_symbol_hits.saturating_mul(100)
1668            >= self
1669                .non_empty
1670                .saturating_mul(CURRENCY_SYMBOL_PROMOTION_THRESHOLD)
1671    }
1672
1673    fn should_promote_currency(&self) -> bool {
1674        self.currency_matches > 0
1675            && self.currency_matches == self.non_empty
1676            && self.currency_symbol_ratio_meets_threshold()
1677    }
1678}
1679
1680fn is_placeholder_token(lowered: &str) -> bool {
1681    let stripped = lowered.trim_start_matches('#');
1682    matches!(
1683        stripped,
1684        "na" | "n/a" | "n.a." | "null" | "none" | "unknown" | "missing"
1685    ) || stripped.starts_with("invalid")
1686        || stripped.chars().all(|c| c == '-')
1687}
1688
1689fn placeholder_token_original(value: &str) -> Option<String> {
1690    let trimmed = value.trim();
1691    if trimmed.is_empty() {
1692        return None;
1693    }
1694    let lowered = trimmed.to_ascii_lowercase();
1695    if is_placeholder_token(&lowered) {
1696        Some(trimmed.to_string())
1697    } else {
1698        None
1699    }
1700}
1701
1702fn build_header_aliases(header: &str) -> HashSet<String> {
1703    let mut aliases = HashSet::new();
1704    let trimmed = header.trim();
1705    if trimmed.is_empty() {
1706        return aliases;
1707    }
1708
1709    let mut try_insert = |candidate: &str| {
1710        let token = candidate.trim();
1711        if token.is_empty() {
1712            return;
1713        }
1714        aliases.insert(token.to_ascii_lowercase());
1715    };
1716
1717    try_insert(trimmed);
1718
1719    for sep in ['_', ' ', '/'] {
1720        if let Some(part) = trimmed.rsplit(sep).next()
1721            && part != trimmed
1722        {
1723            try_insert(part);
1724        }
1725    }
1726
1727    let sanitized: String = trimmed
1728        .chars()
1729        .filter(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '+' | '-'))
1730        .collect();
1731    if !sanitized.is_empty() {
1732        try_insert(&sanitized);
1733        if sanitized.len() >= 2 {
1734            let chars: Vec<char> = sanitized.chars().collect();
1735            let first = chars.first().copied().unwrap();
1736            let last = chars.last().copied().unwrap_or(first);
1737            let shorthand = format!("{}{}", first, last);
1738            try_insert(&shorthand);
1739        }
1740        if sanitized.len() >= 3 {
1741            try_insert(&sanitized[..3]);
1742        }
1743        if sanitized.len() >= 4 {
1744            try_insert(&sanitized[..4]);
1745        }
1746    }
1747
1748    aliases
1749}
1750
1751fn row_values_look_like_header<'a, I>(row: I, header_aliases: &[HashSet<String>]) -> bool
1752where
1753    I: IntoIterator<Item = Option<Cow<'a, str>>>,
1754{
1755    let mut alias_hits = 0usize;
1756    let mut non_empty_fields = 0usize;
1757
1758    for (idx, value_opt) in row.into_iter().enumerate() {
1759        if idx >= header_aliases.len() {
1760            break;
1761        }
1762        let Some(value) = value_opt else {
1763            continue;
1764        };
1765        let trimmed = value.trim();
1766        if trimmed.is_empty() {
1767            continue;
1768        }
1769        non_empty_fields += 1;
1770        let lowered = trimmed.to_ascii_lowercase();
1771        if header_aliases[idx].contains(&lowered) {
1772            alias_hits += 1;
1773        }
1774    }
1775
1776    non_empty_fields >= HEADER_ALIAS_MIN_MATCHES
1777        && alias_hits >= HEADER_ALIAS_MIN_MATCHES
1778        && alias_hits.saturating_mul(100)
1779            >= non_empty_fields.saturating_mul(HEADER_ALIAS_THRESHOLD_PERCENT)
1780}
1781
1782fn option_row_looks_like_header(
1783    row: &[Option<String>],
1784    header_aliases: &[HashSet<String>],
1785) -> bool {
1786    row_values_look_like_header(
1787        row.iter().map(|value| value.as_deref().map(Cow::Borrowed)),
1788        header_aliases,
1789    )
1790}
1791
1792pub(crate) fn row_looks_like_header(row: &[String], header_aliases: &[HashSet<String>]) -> bool {
1793    row_values_look_like_header(
1794        row.iter().map(|value| Some(Cow::Borrowed(value.as_str()))),
1795        header_aliases,
1796    )
1797}
1798
1799fn generate_field_names(count: usize) -> Vec<String> {
1800    (0..count).map(|idx| format!("field_{idx}")).collect()
1801}
1802
1803fn token_is_common_header(value: &str) -> bool {
1804    if value.is_empty() {
1805        return false;
1806    }
1807    let normalized = value.trim().to_ascii_lowercase();
1808    if normalized.is_empty() {
1809        return false;
1810    }
1811    if COMMON_HEADER_TOKENS
1812        .iter()
1813        .any(|token| normalized == *token)
1814    {
1815        return true;
1816    }
1817    let sanitized = normalized
1818        .chars()
1819        .map(|ch| match ch {
1820            ' ' | '-' | '/' => '_',
1821            other => other,
1822        })
1823        .collect::<String>();
1824    COMMON_HEADER_TOKENS.iter().any(|token| sanitized == *token)
1825}
1826
1827fn value_is_data_like(value: &str) -> bool {
1828    let trimmed = value.trim();
1829    if trimmed.is_empty() {
1830        return false;
1831    }
1832    let lowered = trimmed.to_ascii_lowercase();
1833    if matches!(
1834        lowered.as_str(),
1835        "true" | "false" | "t" | "f" | "yes" | "no" | "y" | "n" | "1" | "0"
1836    ) {
1837        return true;
1838    }
1839    if parse_decimal_literal(trimmed).is_ok() {
1840        return true;
1841    }
1842    if parse_currency_decimal(trimmed).is_ok() {
1843        return true;
1844    }
1845    if trimmed.parse::<i64>().is_ok() {
1846        return true;
1847    }
1848    if trimmed.parse::<f64>().is_ok() {
1849        return true;
1850    }
1851    if parse_naive_datetime(trimmed).is_ok() {
1852        return true;
1853    }
1854    if parse_naive_date(trimmed).is_ok() {
1855        return true;
1856    }
1857    if parse_naive_time(trimmed).is_ok() {
1858        return true;
1859    }
1860    let trimmed_guid = trimmed.trim_matches(|c| matches!(c, '{' | '}'));
1861    Uuid::parse_str(trimmed_guid).is_ok()
1862}
1863
1864fn value_is_header_like(value: &str) -> bool {
1865    let trimmed = value.trim();
1866    if trimmed.is_empty() {
1867        return false;
1868    }
1869    if value_is_data_like(trimmed) {
1870        return false;
1871    }
1872    trimmed.chars().any(|c| c.is_ascii_alphabetic()) || token_is_common_header(trimmed)
1873}
1874
1875fn header_tokens_match_dictionary(row: &[String]) -> bool {
1876    row.iter()
1877        .filter(|value| token_is_common_header(value.trim()))
1878        .count()
1879        >= 2
1880}
1881
1882fn infer_has_header(first_row: &[String], other_rows: &[Vec<String>]) -> bool {
1883    let header_like_first = first_row
1884        .iter()
1885        .filter(|value| value_is_header_like(value))
1886        .count();
1887    let data_like_first = first_row
1888        .iter()
1889        .filter(|value| value_is_data_like(value))
1890        .count();
1891
1892    if header_like_first == 0 && data_like_first == 0 {
1893        return false;
1894    }
1895
1896    if data_like_first > header_like_first {
1897        return false;
1898    }
1899
1900    if other_rows.is_empty() {
1901        return header_like_first >= 2 || header_tokens_match_dictionary(first_row);
1902    }
1903
1904    let mut header_signal = 0usize;
1905    let mut data_signal = 0usize;
1906
1907    for column in 0..first_row.len() {
1908        let first_value = first_row.get(column).map(|s| s.as_str()).unwrap_or("");
1909        let first_is_header = value_is_header_like(first_value);
1910        let first_is_data = value_is_data_like(first_value);
1911
1912        let mut other_has_data = false;
1913        for row in other_rows {
1914            if let Some(value) = row.get(column)
1915                && value_is_data_like(value)
1916            {
1917                other_has_data = true;
1918                break;
1919            }
1920        }
1921
1922        if first_is_header && other_has_data {
1923            header_signal += 1;
1924        } else if first_is_data && other_has_data {
1925            data_signal += 1;
1926        }
1927    }
1928
1929    if header_signal > data_signal {
1930        return true;
1931    }
1932    if data_signal > header_signal {
1933        return false;
1934    }
1935
1936    if header_tokens_match_dictionary(first_row) && header_like_first >= 1 {
1937        return true;
1938    }
1939
1940    header_like_first > data_like_first
1941}
1942
1943pub fn detect_csv_layout(
1944    path: &Path,
1945    delimiter: u8,
1946    encoding: &'static Encoding,
1947    header_override: Option<bool>,
1948) -> Result<CsvLayout> {
1949    if io_utils::is_dash(path) {
1950        return Ok(CsvLayout {
1951            headers: Vec::new(),
1952            has_headers: header_override.unwrap_or(true),
1953        });
1954    }
1955
1956    if let Some(force_header) = header_override {
1957        let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, force_header)?;
1958        if force_header {
1959            let header_record = reader.byte_headers()?.clone();
1960            let headers = io_utils::decode_headers(&header_record, encoding)?;
1961            return Ok(CsvLayout {
1962                headers,
1963                has_headers: true,
1964            });
1965        } else {
1966            let mut record = csv::ByteRecord::new();
1967            let width = if reader.read_byte_record(&mut record)? {
1968                record.len()
1969            } else {
1970                0
1971            };
1972            let headers = generate_field_names(width);
1973            return Ok(CsvLayout {
1974                headers,
1975                has_headers: false,
1976            });
1977        }
1978    }
1979
1980    let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, false)?;
1981    let mut record = csv::ByteRecord::new();
1982    let mut decoded_rows = Vec::new();
1983
1984    while decoded_rows.len() < HEADER_DETECTION_SAMPLE_ROWS
1985        && reader.read_byte_record(&mut record)?
1986    {
1987        let decoded = io_utils::decode_record(&record, encoding)?;
1988        decoded_rows.push(decoded);
1989    }
1990
1991    if decoded_rows.is_empty() {
1992        return Ok(CsvLayout {
1993            headers: Vec::new(),
1994            has_headers: true,
1995        });
1996    }
1997
1998    let first_row = decoded_rows.first().cloned().unwrap_or_default();
1999    let has_header = infer_has_header(&first_row, &decoded_rows[1..]);
2000    let headers = if has_header {
2001        first_row
2002    } else {
2003        generate_field_names(first_row.len())
2004    };
2005
2006    Ok(CsvLayout {
2007        headers,
2008        has_headers: has_header,
2009    })
2010}
2011
2012pub fn infer_schema(
2013    path: &Path,
2014    sample_rows: usize,
2015    delimiter: u8,
2016    encoding: &'static Encoding,
2017    header_override: Option<bool>,
2018) -> Result<Schema> {
2019    let policy = PlaceholderPolicy::default();
2020    let (schema, _stats) = infer_schema_with_stats(
2021        path,
2022        sample_rows,
2023        delimiter,
2024        encoding,
2025        &policy,
2026        header_override,
2027    )?;
2028    Ok(schema)
2029}
2030
2031pub fn infer_schema_with_stats(
2032    path: &Path,
2033    sample_rows: usize,
2034    delimiter: u8,
2035    encoding: &'static Encoding,
2036    _placeholder_policy: &PlaceholderPolicy,
2037    header_override: Option<bool>,
2038) -> Result<(Schema, InferenceStats)> {
2039    let layout = detect_csv_layout(path, delimiter, encoding, header_override)?;
2040    let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, layout.has_headers)?;
2041    let headers = if layout.has_headers {
2042        let header_record = reader.byte_headers()?.clone();
2043        io_utils::decode_headers(&header_record, encoding)?
2044    } else {
2045        layout.headers.clone()
2046    };
2047    let mut candidates = vec![TypeCandidate::new(); headers.len()];
2048    let mut samples = vec![None; headers.len()];
2049    let mut summaries = vec![SummaryAccumulator::default(); headers.len()];
2050    let mut placeholders = vec![PlaceholderSummary::default(); headers.len()];
2051    let header_aliases: Vec<HashSet<String>> = headers
2052        .iter()
2053        .map(|header| build_header_aliases(header))
2054        .collect();
2055
2056    let mut record = csv::ByteRecord::new();
2057    let mut processed = 0usize;
2058    let mut decode_errors = 0usize;
2059    while reader.read_byte_record(&mut record)? {
2060        if sample_rows > 0 && processed >= sample_rows {
2061            break;
2062        }
2063        let mut decoded_row: Vec<Option<String>> = Vec::with_capacity(headers.len());
2064
2065        for field in record.iter().take(headers.len()) {
2066            if field.is_empty() {
2067                decoded_row.push(None);
2068                continue;
2069            }
2070            match io_utils::decode_bytes(field, encoding) {
2071                Ok(decoded) => {
2072                    let trimmed = decoded.trim();
2073                    if trimmed.is_empty() {
2074                        decoded_row.push(None);
2075                        continue;
2076                    }
2077                    let value = trimmed.to_string();
2078                    decoded_row.push(Some(value));
2079                }
2080                Err(_) => {
2081                    decode_errors += 1;
2082                    decoded_row.push(None);
2083                }
2084            }
2085        }
2086
2087        while decoded_row.len() < headers.len() {
2088            decoded_row.push(None);
2089        }
2090
2091        let header_like = option_row_looks_like_header(&decoded_row, &header_aliases);
2092
2093        if header_like {
2094            continue;
2095        }
2096
2097        for (idx, value_opt) in decoded_row.into_iter().enumerate() {
2098            let Some(value) = value_opt else {
2099                continue;
2100            };
2101            if let Some(token) = placeholder_token_original(&value) {
2102                placeholders[idx].record(&token);
2103                continue;
2104            }
2105            candidates[idx].update(&value);
2106            summaries[idx].record(&value);
2107            if samples[idx].is_none() {
2108                samples[idx] = Some(value.clone());
2109            }
2110        }
2111        processed += 1;
2112    }
2113
2114    let columns = headers
2115        .iter()
2116        .enumerate()
2117        .map(|(idx, header)| ColumnMeta {
2118            name: header.clone(),
2119            datatype: candidates[idx].decide(),
2120            rename: None,
2121            value_replacements: Vec::new(),
2122            datatype_mappings: Vec::new(),
2123        })
2124        .collect();
2125
2126    let schema = Schema {
2127        columns,
2128        schema_version: None,
2129        has_headers: layout.has_headers,
2130    };
2131    let stats = InferenceStats {
2132        sample_values: samples,
2133        rows_read: processed,
2134        requested_rows: sample_rows,
2135        decode_errors,
2136        summaries: summaries
2137            .into_iter()
2138            .map(SummaryAccumulator::finalize)
2139            .collect(),
2140        placeholder_summaries: placeholders,
2141    };
2142
2143    Ok((schema, stats))
2144}
2145
2146pub(crate) fn format_hint_for(datatype: &ColumnType, sample: Option<&str>) -> Option<String> {
2147    let sample = sample?;
2148    match datatype {
2149        ColumnType::DateTime => {
2150            if sample.contains('T') {
2151                Some("ISO 8601 date-time".to_string())
2152            } else if sample.contains('/') {
2153                Some("Slash-separated date-time".to_string())
2154            } else if sample.contains('-') {
2155                Some("Hyphen-separated date-time".to_string())
2156            } else {
2157                Some("Date-time without delimiter hints".to_string())
2158            }
2159        }
2160        ColumnType::Date => {
2161            if sample.contains('/') {
2162                Some("Slash-separated date".to_string())
2163            } else if sample.contains('-') {
2164                Some("Hyphen-separated date".to_string())
2165            } else if sample.contains('.') {
2166                Some("Dot-separated date".to_string())
2167            } else {
2168                Some("Date without delimiter hints".to_string())
2169            }
2170        }
2171        ColumnType::Time => {
2172            if sample.contains('.') {
2173                Some("Time with fractional seconds".to_string())
2174            } else {
2175                Some("Colon-separated time".to_string())
2176            }
2177        }
2178        ColumnType::Boolean => {
2179            let normalized = sample.trim().to_ascii_lowercase();
2180            if matches!(normalized.as_str(), "true" | "false" | "t" | "f") {
2181                Some("Boolean (true/false tokens)".to_string())
2182            } else if matches!(normalized.as_str(), "yes" | "no" | "y" | "n") {
2183                Some("Boolean (yes/no tokens)".to_string())
2184            } else if matches!(normalized.as_str(), "1" | "0") {
2185                Some("Boolean (1/0 tokens)".to_string())
2186            } else {
2187                Some("Boolean (mixed tokens)".to_string())
2188            }
2189        }
2190        ColumnType::Float => {
2191            let has_currency = ["$", "€", "£", "¥"]
2192                .iter()
2193                .any(|symbol| sample.contains(symbol));
2194            if has_currency {
2195                Some("Currency symbol detected".to_string())
2196            } else if sample.contains(',') {
2197                Some("Thousands separator present".to_string())
2198            } else if sample.contains('.') {
2199                Some("Decimal point".to_string())
2200            } else {
2201                Some("Floating number without decimal point".to_string())
2202            }
2203        }
2204        ColumnType::Decimal(spec) => Some(format!(
2205            "Fixed decimal (precision {}, scale {})",
2206            spec.precision, spec.scale
2207        )),
2208        ColumnType::Currency => Some("Currency amount (2 or 4 decimal places)".to_string()),
2209        ColumnType::Integer => {
2210            if sample.starts_with('0') && sample.len() > 1 {
2211                Some("Leading zeros preserved".to_string())
2212            } else {
2213                Some("Whole number".to_string())
2214            }
2215        }
2216        ColumnType::Guid => {
2217            if sample.contains('{') || sample.contains('}') {
2218                Some("GUID with braces".to_string())
2219            } else if sample.contains('-') {
2220                Some("Canonical GUID".to_string())
2221            } else {
2222                Some("GUID without separators".to_string())
2223            }
2224        }
2225        ColumnType::String => None,
2226    }
2227}
2228
2229impl ColumnMeta {
2230    pub fn has_mappings(&self) -> bool {
2231        !self.datatype_mappings.is_empty()
2232    }
2233
2234    pub fn output_name(&self) -> &str {
2235        self.rename
2236            .as_deref()
2237            .filter(|value| !value.is_empty())
2238            .unwrap_or(&self.name)
2239    }
2240
2241    pub fn matches_header(&self, header: &str) -> bool {
2242        if header == self.name {
2243            return true;
2244        }
2245        if let Some(rename) = self.rename.as_deref()
2246            && !rename.is_empty()
2247            && header == rename
2248        {
2249            return true;
2250        }
2251        false
2252    }
2253
2254    pub fn apply_mappings_to_value(&self, value: &str) -> Result<Option<String>> {
2255        if value.is_empty() {
2256            return Ok(None);
2257        }
2258        if !self.has_mappings() {
2259            return Ok(Some(value.to_string()));
2260        }
2261
2262        let first_mapping = self
2263            .datatype_mappings
2264            .first()
2265            .expect("has_mappings() guarantees at least one mapping");
2266
2267        let mut current = parse_initial_value(value, first_mapping)?;
2268        for mapping in &self.datatype_mappings {
2269            let current_type = value_column_type(&current);
2270            ensure!(
2271                current_type == mapping.from,
2272                "Datatype mapping chain expects '{:?}' but encountered '{:?}'",
2273                mapping.from,
2274                current_type
2275            );
2276            current = apply_single_mapping(mapping, current)?;
2277        }
2278
2279        let last_mapping = self
2280            .datatype_mappings
2281            .last()
2282            .expect("non-empty mapping chain");
2283        let rendered = render_mapped_value(&current, last_mapping)?;
2284        if rendered.is_empty() {
2285            Ok(None)
2286        } else {
2287            Ok(Some(rendered))
2288        }
2289    }
2290
2291    pub fn normalize_value<'a>(&self, value: &'a str) -> Cow<'a, str> {
2292        for replacement in &self.value_replacements {
2293            if value == replacement.from {
2294                return Cow::Owned(replacement.to.clone());
2295            }
2296        }
2297        Cow::Borrowed(value)
2298    }
2299}
2300
2301impl Schema {
2302    pub fn has_transformations(&self) -> bool {
2303        self.columns.iter().any(|column| column.has_mappings())
2304    }
2305
2306    pub fn apply_transformations_to_row(&self, row: &mut [String]) -> Result<()> {
2307        for (idx, column) in self.columns.iter().enumerate() {
2308            if !column.has_mappings() {
2309                continue;
2310            }
2311            if let Some(cell) = row.get_mut(idx) {
2312                let original = cell.clone();
2313                match column
2314                    .apply_mappings_to_value(&original)
2315                    .with_context(|| format!("Column '{}'", column.name))?
2316                {
2317                    Some(mapped) => *cell = mapped,
2318                    None => cell.clear(),
2319                }
2320            }
2321        }
2322        Ok(())
2323    }
2324
2325    pub fn apply_replacements_to_row(&self, row: &mut [String]) {
2326        for (idx, column) in self.columns.iter().enumerate() {
2327            if let Some(value) = row.get_mut(idx)
2328                && let Cow::Owned(normalized) = column.normalize_value(value)
2329            {
2330                *value = normalized;
2331            }
2332        }
2333    }
2334
2335    pub fn validate_datatype_mappings(&self) -> Result<()> {
2336        self.validate_decimal_specs()?;
2337        for column in &self.columns {
2338            if column.datatype_mappings.is_empty() {
2339                continue;
2340            }
2341            let mut previous_to = None;
2342            for (step_index, mapping) in column.datatype_mappings.iter().enumerate() {
2343                if let Some(expected) = previous_to.as_ref() {
2344                    ensure!(
2345                        mapping.from == *expected,
2346                        "Column '{}' mapping step {} expects input '{:?}' but prior step outputs '{:?}'",
2347                        column.name,
2348                        step_index + 1,
2349                        mapping.from,
2350                        expected
2351                    );
2352                }
2353                validate_mapping_options(&column.name, mapping)?;
2354                previous_to = Some(mapping.to.clone());
2355            }
2356            let terminal = previous_to.expect("mapping chain must have terminal type");
2357            ensure!(
2358                terminal == column.datatype,
2359                "Column '{}' mappings terminate at '{:?}' but column datatype is '{:?}'",
2360                column.name,
2361                terminal,
2362                column.datatype
2363            );
2364        }
2365        Ok(())
2366    }
2367
2368    fn validate_decimal_specs(&self) -> Result<()> {
2369        for column in &self.columns {
2370            if let ColumnType::Decimal(spec) = &column.datatype {
2371                spec.ensure_valid()?;
2372            }
2373            for mapping in &column.datatype_mappings {
2374                if let ColumnType::Decimal(spec) = &mapping.from {
2375                    spec.ensure_valid()?;
2376                }
2377                if let ColumnType::Decimal(spec) = &mapping.to {
2378                    spec.ensure_valid()?;
2379                }
2380            }
2381        }
2382        Ok(())
2383    }
2384}
2385
2386pub fn apply_placeholder_replacements(
2387    schema: &mut Schema,
2388    stats: &InferenceStats,
2389    policy: &PlaceholderPolicy,
2390) -> usize {
2391    let replacement_value = match policy {
2392        PlaceholderPolicy::TreatAsEmpty => String::new(),
2393        PlaceholderPolicy::FillWith(value) => value.clone(),
2394    };
2395    let mut added = 0usize;
2396    for (idx, column) in schema.columns.iter_mut().enumerate() {
2397        let Some(summary) = stats.placeholder_summary(idx) else {
2398            continue;
2399        };
2400        let entries = summary.entries();
2401        if entries.is_empty() {
2402            continue;
2403        }
2404        for (token, _) in entries {
2405            if column
2406                .value_replacements
2407                .iter()
2408                .any(|existing| existing.from == token)
2409            {
2410                continue;
2411            }
2412            column.value_replacements.push(ValueReplacement {
2413                from: token,
2414                to: replacement_value.clone(),
2415            });
2416            added += 1;
2417        }
2418    }
2419    added
2420}
2421
2422#[cfg(test)]
2423mod tests {
2424    use super::*;
2425    use encoding_rs::UTF_8;
2426    use proptest::prelude::*;
2427    use std::io::Write;
2428    use std::str::FromStr;
2429    use tempfile::NamedTempFile;
2430
2431    #[test]
2432    fn infer_schema_with_stats_captures_samples() {
2433        let mut file = NamedTempFile::new().expect("temp file");
2434        writeln!(file, "id,date,value").unwrap();
2435        writeln!(file, "1,2024-01-01T08:30:00Z,$12.34").unwrap();
2436        writeln!(file, "2,2024-01-02T09:45:00Z,$56.78").unwrap();
2437
2438        let policy = PlaceholderPolicy::default();
2439        let (schema, stats) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2440            .expect("infer with stats");
2441
2442        assert_eq!(schema.columns.len(), 3);
2443        assert_eq!(stats.sample_value(1), Some("2024-01-01T08:30:00Z"));
2444        assert_eq!(stats.sample_value(2), Some("$12.34"));
2445        assert_eq!(stats.rows_read(), 2);
2446        assert_eq!(stats.decode_errors(), 0);
2447    }
2448
2449    #[test]
2450    fn format_hint_detects_common_patterns() {
2451        let date_hint = format_hint_for(&ColumnType::Date, Some("2024/01/30"));
2452        assert_eq!(date_hint.as_deref(), Some("Slash-separated date"));
2453
2454        let currency_hint = format_hint_for(&ColumnType::Float, Some("€1,234.50"));
2455        assert_eq!(currency_hint.as_deref(), Some("Currency symbol detected"));
2456
2457        let guid_hint = format_hint_for(
2458            &ColumnType::Guid,
2459            Some("{ABCDEF12-3456-7890-ABCD-EF1234567890}"),
2460        );
2461        assert_eq!(guid_hint.as_deref(), Some("GUID with braces"));
2462    }
2463
2464    #[test]
2465    fn datatype_mappings_convert_string_to_date() {
2466        let mappings = vec![
2467            DatatypeMapping {
2468                from: ColumnType::String,
2469                to: ColumnType::DateTime,
2470                strategy: None,
2471                options: BTreeMap::new(),
2472            },
2473            DatatypeMapping {
2474                from: ColumnType::DateTime,
2475                to: ColumnType::Date,
2476                strategy: None,
2477                options: BTreeMap::new(),
2478            },
2479        ];
2480
2481        let column = ColumnMeta {
2482            name: "event_date".to_string(),
2483            datatype: ColumnType::Date,
2484            rename: None,
2485            value_replacements: Vec::new(),
2486            datatype_mappings: mappings,
2487        };
2488        let schema = Schema {
2489            columns: vec![column],
2490            schema_version: None,
2491            has_headers: true,
2492        };
2493
2494        let mut row = vec!["2024-05-10T13:45:00".to_string()];
2495        schema
2496            .apply_transformations_to_row(&mut row)
2497            .expect("apply datatype mappings");
2498        assert_eq!(row[0], "2024-05-10");
2499    }
2500
2501    #[test]
2502    fn datatype_mappings_round_float_values() {
2503        let mut options = BTreeMap::new();
2504        options.insert("scale".to_string(), Value::from(4));
2505        let mapping = DatatypeMapping {
2506            from: ColumnType::String,
2507            to: ColumnType::Float,
2508            strategy: Some("round".to_string()),
2509            options,
2510        };
2511        let column = ColumnMeta {
2512            name: "measurement".to_string(),
2513            datatype: ColumnType::Float,
2514            rename: None,
2515            value_replacements: Vec::new(),
2516            datatype_mappings: vec![mapping],
2517        };
2518        let schema = Schema {
2519            columns: vec![column],
2520            schema_version: None,
2521            has_headers: true,
2522        };
2523        let mut row = vec!["3.1415926535".to_string()];
2524        schema
2525            .apply_transformations_to_row(&mut row)
2526            .expect("round float");
2527        assert_eq!(row[0], "3.1416");
2528    }
2529
2530    #[test]
2531    fn datatype_mappings_round_currency_values() {
2532        let mut options = BTreeMap::new();
2533        options.insert("scale".to_string(), Value::from(2));
2534        let mapping = DatatypeMapping {
2535            from: ColumnType::String,
2536            to: ColumnType::Currency,
2537            strategy: Some("round".to_string()),
2538            options,
2539        };
2540        let column = ColumnMeta {
2541            name: "price".to_string(),
2542            datatype: ColumnType::Currency,
2543            rename: None,
2544            value_replacements: Vec::new(),
2545            datatype_mappings: vec![mapping],
2546        };
2547        let schema = Schema {
2548            columns: vec![column],
2549            schema_version: None,
2550            has_headers: true,
2551        };
2552        let mut row = vec!["12.345".to_string()];
2553        schema
2554            .apply_transformations_to_row(&mut row)
2555            .expect("round currency");
2556        assert_eq!(row[0], "12.35");
2557    }
2558
2559    #[test]
2560    fn datatype_mappings_preserve_currency_scale_when_unspecified() {
2561        let mapping = DatatypeMapping {
2562            from: ColumnType::String,
2563            to: ColumnType::Currency,
2564            strategy: None,
2565            options: BTreeMap::new(),
2566        };
2567        let column = ColumnMeta {
2568            name: "premium".to_string(),
2569            datatype: ColumnType::Currency,
2570            rename: None,
2571            value_replacements: Vec::new(),
2572            datatype_mappings: vec![mapping],
2573        };
2574        let schema = Schema {
2575            columns: vec![column],
2576            schema_version: None,
2577            has_headers: true,
2578        };
2579        let mut row = vec!["123.4567".to_string()];
2580        schema
2581            .apply_transformations_to_row(&mut row)
2582            .expect("preserve currency scale");
2583        assert_eq!(row[0], "123.4567");
2584    }
2585
2586    #[test]
2587    fn datatype_mappings_convert_currency_to_decimal() {
2588        let spec = DecimalSpec::new(10, 2).expect("decimal spec");
2589        let currency_mapping = DatatypeMapping {
2590            from: ColumnType::String,
2591            to: ColumnType::Currency,
2592            strategy: None,
2593            options: BTreeMap::new(),
2594        };
2595        let decimal_mapping = DatatypeMapping {
2596            from: ColumnType::Currency,
2597            to: ColumnType::Decimal(spec.clone()),
2598            strategy: Some("truncate".to_string()),
2599            options: BTreeMap::new(),
2600        };
2601        let column = ColumnMeta {
2602            name: "amount".to_string(),
2603            datatype: ColumnType::Decimal(spec.clone()),
2604            rename: None,
2605            value_replacements: Vec::new(),
2606            datatype_mappings: vec![currency_mapping, decimal_mapping],
2607        };
2608        let schema = Schema {
2609            columns: vec![column],
2610            schema_version: None,
2611            has_headers: true,
2612        };
2613        let mut row = vec!["$123.4567".to_string()];
2614        schema
2615            .apply_transformations_to_row(&mut row)
2616            .expect("currency to decimal mapping");
2617        assert_eq!(row[0], "123.45");
2618    }
2619
2620    #[test]
2621    fn infer_schema_identifies_currency_columns() {
2622        let mut file = NamedTempFile::new().expect("temp file");
2623        writeln!(file, "amount,name").unwrap();
2624        writeln!(file, "$12.34,alpha").unwrap();
2625        writeln!(file, "56.7800,beta").unwrap();
2626
2627        let policy = PlaceholderPolicy::default();
2628        let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2629            .expect("infer schema");
2630        assert_eq!(schema.columns.len(), 2);
2631        assert_eq!(schema.columns[0].datatype, ColumnType::Currency);
2632        assert_eq!(schema.columns[1].datatype, ColumnType::String);
2633    }
2634
2635    #[test]
2636    fn infer_schema_promotes_currency_when_symbol_ratio_met() {
2637        let mut file = NamedTempFile::new().expect("temp file");
2638        writeln!(file, "amount").unwrap();
2639        writeln!(file, "$12.00").unwrap();
2640        writeln!(file, "14").unwrap();
2641        writeln!(file, "15").unwrap();
2642
2643        let policy = PlaceholderPolicy::default();
2644        let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2645            .expect("infer schema");
2646        assert_eq!(schema.columns.len(), 1);
2647        assert_eq!(schema.columns[0].datatype, ColumnType::Currency);
2648    }
2649
2650    #[test]
2651    fn infer_schema_prefers_decimal_when_fraction_present() {
2652        let mut file = NamedTempFile::new().expect("temp file");
2653        writeln!(file, "amount").unwrap();
2654        writeln!(file, "1").unwrap();
2655        writeln!(file, "2").unwrap();
2656        writeln!(file, "3.5").unwrap();
2657
2658        let policy = PlaceholderPolicy::default();
2659        let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2660            .expect("infer schema");
2661
2662        let expected = DecimalSpec::new(2, 1).expect("valid spec");
2663        match &schema.columns[0].datatype {
2664            ColumnType::Decimal(spec) => assert_eq!(spec, &expected),
2665            other => panic!("expected decimal column, got {other:?}"),
2666        }
2667    }
2668
2669    #[test]
2670    fn infer_schema_supports_scientific_notation_as_decimal() {
2671        let mut file = NamedTempFile::new().expect("temp file");
2672        writeln!(file, "value").unwrap();
2673        writeln!(file, "1e3").unwrap();
2674        writeln!(file, "2.5e-1").unwrap();
2675
2676        let policy = PlaceholderPolicy::default();
2677        let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2678            .expect("infer schema");
2679
2680        let expected = DecimalSpec::new(6, 2).expect("valid spec");
2681        match &schema.columns[0].datatype {
2682            ColumnType::Decimal(spec) => assert_eq!(spec, &expected),
2683            other => panic!("expected decimal column, got {other:?}"),
2684        }
2685    }
2686
2687    #[test]
2688    fn infer_schema_treats_leading_zero_integers_as_string() {
2689        let mut file = NamedTempFile::new().expect("temp file");
2690        writeln!(file, "code").unwrap();
2691        writeln!(file, "001").unwrap();
2692        writeln!(file, "002").unwrap();
2693        writeln!(file, "003").unwrap();
2694
2695        let policy = PlaceholderPolicy::default();
2696        let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2697            .expect("infer schema");
2698
2699        assert_eq!(schema.columns[0].datatype, ColumnType::String);
2700    }
2701
2702    #[test]
2703    fn infer_schema_prioritizes_decimal_over_currency_without_symbols() {
2704        let mut file = NamedTempFile::new().expect("temp file");
2705        writeln!(file, "amount").unwrap();
2706        writeln!(file, "12.34").unwrap();
2707        writeln!(file, "45.67").unwrap();
2708
2709        let policy = PlaceholderPolicy::default();
2710        let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2711            .expect("infer schema");
2712
2713        let expected = DecimalSpec::new(4, 2).expect("valid spec");
2714        match &schema.columns[0].datatype {
2715            ColumnType::Decimal(spec) => assert_eq!(spec, &expected),
2716            other => panic!("expected decimal column, got {other:?}"),
2717        }
2718    }
2719
2720    #[test]
2721    fn analyze_numeric_token_handles_scientific_notation() {
2722        let observation =
2723            super::analyze_numeric_token("1e3").expect("scientific notation should be recognized");
2724        assert!(matches!(observation.kind, NumericKind::Decimal));
2725    }
2726
2727    #[test]
2728    fn analyze_numeric_token_handles_scientific_with_fraction() {
2729        let observation = super::analyze_numeric_token("2.5e-1")
2730            .expect("scientific notation with fraction should be recognized");
2731        assert!(matches!(observation.kind, NumericKind::Decimal));
2732        assert_eq!(observation.scale, 2);
2733        assert_eq!(observation.precision, 2);
2734    }
2735
2736    #[test]
2737    fn infer_schema_prefers_majority_integer() {
2738        let mut file = NamedTempFile::new().expect("temp file");
2739        writeln!(file, "id,name").unwrap();
2740        writeln!(file, "1,alpha").unwrap();
2741        writeln!(file, "2,beta").unwrap();
2742        writeln!(file, "unknown,gamma").unwrap();
2743
2744        let policy = PlaceholderPolicy::default();
2745        let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2746            .expect("infer schema");
2747        assert_eq!(schema.columns[0].datatype, ColumnType::Integer);
2748        assert_eq!(schema.columns[1].datatype, ColumnType::String);
2749    }
2750
2751    #[test]
2752    fn infer_schema_prefers_majority_boolean() {
2753        let mut file = NamedTempFile::new().expect("temp file");
2754        writeln!(file, "flag").unwrap();
2755        writeln!(file, "true").unwrap();
2756        writeln!(file, "false").unwrap();
2757        writeln!(file, "unknown").unwrap();
2758
2759        let policy = PlaceholderPolicy::default();
2760        let (schema, _) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2761            .expect("infer schema");
2762        assert_eq!(schema.columns.len(), 1);
2763        assert_eq!(schema.columns[0].datatype, ColumnType::Boolean);
2764    }
2765
2766    #[test]
2767    fn infer_schema_collects_na_placeholders() {
2768        let mut file = NamedTempFile::new().expect("temp file");
2769        writeln!(file, "value").unwrap();
2770        writeln!(file, "NA").unwrap();
2771        writeln!(file, "#N/A").unwrap();
2772        writeln!(file, "42").unwrap();
2773
2774        let policy = PlaceholderPolicy::default();
2775        let (_, stats) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2776            .expect("infer stats");
2777
2778        let summary = stats.placeholder_summary(0).expect("placeholder summary");
2779        let entries = summary.entries();
2780        assert_eq!(entries.len(), 2);
2781        assert!(
2782            entries
2783                .iter()
2784                .any(|(token, count)| token == "NA" && *count == 1)
2785        );
2786        assert!(
2787            entries
2788                .iter()
2789                .any(|(token, count)| token == "#N/A" && *count == 1)
2790        );
2791    }
2792
2793    #[test]
2794    fn assume_header_false_forces_field_names() {
2795        let mut file = NamedTempFile::new().expect("temp file");
2796        writeln!(file, "id,value").unwrap();
2797        writeln!(file, "1,2").unwrap();
2798        writeln!(file, "3,4").unwrap();
2799
2800        let policy = PlaceholderPolicy::default();
2801        let (schema, _) =
2802            infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, Some(false))
2803                .expect("force headerless schema");
2804
2805        assert!(!schema.has_headers);
2806        let column_names: Vec<_> = schema.columns.iter().map(|c| c.name.as_str()).collect();
2807        assert_eq!(column_names, vec!["field_0", "field_1"]);
2808    }
2809
2810    #[test]
2811    fn assume_header_true_preserves_first_row_names() {
2812        let mut file = NamedTempFile::new().expect("temp file");
2813        writeln!(file, "100,200").unwrap();
2814        writeln!(file, "1,2").unwrap();
2815        writeln!(file, "3,4").unwrap();
2816
2817        let policy = PlaceholderPolicy::default();
2818        let (schema, stats) =
2819            infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, Some(true))
2820                .expect("assume header true");
2821
2822        assert!(schema.has_headers);
2823        let column_names: Vec<_> = schema.columns.iter().map(|c| c.name.as_str()).collect();
2824        assert_eq!(column_names, vec!["100", "200"]);
2825        // Ensure header row was excluded from samples by checking first sample value
2826        assert_eq!(stats.sample_value(0), Some("1"));
2827    }
2828
2829    #[test]
2830    fn apply_placeholder_replacements_respects_policy() {
2831        let mut file = NamedTempFile::new().expect("temp file");
2832        writeln!(file, "value").unwrap();
2833        writeln!(file, "NA").unwrap();
2834        writeln!(file, "#NA").unwrap();
2835        writeln!(file, "7").unwrap();
2836
2837        let policy = PlaceholderPolicy::default();
2838        let (schema, stats) = infer_schema_with_stats(file.path(), 0, b',', UTF_8, &policy, None)
2839            .expect("infer schema");
2840
2841        let mut schema_empty = schema.clone();
2842        let added_empty = apply_placeholder_replacements(&mut schema_empty, &stats, &policy);
2843        assert_eq!(added_empty, 2);
2844        assert!(
2845            schema_empty.columns[0]
2846                .value_replacements
2847                .iter()
2848                .any(|r| r.from == "NA" && r.to.is_empty())
2849        );
2850        assert!(
2851            schema_empty.columns[0]
2852                .value_replacements
2853                .iter()
2854                .any(|r| r.from == "#NA" && r.to.is_empty())
2855        );
2856
2857        let mut schema_fill = schema.clone();
2858        let fill_policy = PlaceholderPolicy::FillWith("NULL".to_string());
2859        let added_fill = apply_placeholder_replacements(&mut schema_fill, &stats, &fill_policy);
2860        assert_eq!(added_fill, 2);
2861        assert!(
2862            schema_fill.columns[0]
2863                .value_replacements
2864                .iter()
2865                .all(|r| r.to == "NULL")
2866        );
2867
2868        let added_duplicate =
2869            apply_placeholder_replacements(&mut schema_fill, &stats, &fill_policy);
2870        assert_eq!(added_duplicate, 0);
2871    }
2872
2873    #[test]
2874    fn parse_decimal_type_supports_positional_syntax() {
2875        let parsed = ColumnType::from_str("decimal(18,4)").expect("parse decimal positional");
2876        match parsed {
2877            ColumnType::Decimal(spec) => {
2878                assert_eq!(spec.precision, 18);
2879                assert_eq!(spec.scale, 4);
2880            }
2881            other => panic!("expected decimal column, got {other:?}"),
2882        }
2883    }
2884
2885    #[test]
2886    fn parse_decimal_type_supports_named_syntax() {
2887        let parsed =
2888            ColumnType::from_str("decimal(precision=20, scale=6)").expect("parse decimal named");
2889        let spec = parsed
2890            .decimal_spec()
2891            .expect("decimal spec present after parsing");
2892        assert_eq!(spec.precision, 20);
2893        assert_eq!(spec.scale, 6);
2894    }
2895
2896    #[test]
2897    fn parse_decimal_type_rejects_missing_scale() {
2898        let err = ColumnType::from_str("decimal(10)").expect_err("missing scale error");
2899        assert!(
2900            err.to_string()
2901                .contains("Decimal type requires a scale value")
2902        );
2903    }
2904
2905    #[test]
2906    fn schema_parsing_rejects_unsupported_structured_datatype() {
2907        let yaml = r#"
2908columns:
2909  - name: location
2910    datatype:
2911      geography: {}
2912"#;
2913        let err = serde_yaml::from_str::<Schema>(yaml)
2914            .expect_err("unsupported structured datatype should fail");
2915        assert!(
2916            err.to_string()
2917                .contains("Unsupported structured datatype 'geography'")
2918        );
2919    }
2920
2921    #[test]
2922    fn schema_parsing_rejects_decimal_precision_overflow() {
2923        let yaml = r#"
2924columns:
2925  - name: amount
2926    datatype: decimal(29,2)
2927"#;
2928        let err = serde_yaml::from_str::<Schema>(yaml).expect_err("precision overflow should fail");
2929        assert!(err.to_string().contains("Decimal precision must be <="));
2930    }
2931
2932    #[test]
2933    fn decimal_cli_token_formats_precision_and_scale() {
2934        let parsed = ColumnType::from_str("decimal(28,9)").expect("parse decimal for cli token");
2935        assert_eq!(parsed.cli_token(), "decimal(28,9)");
2936        assert_eq!(parsed.signature_token(), "decimal(28,9)");
2937        assert_eq!(parsed.describe(), "decimal(precision=28,scale=9)");
2938    }
2939
2940    #[test]
2941    fn datatype_mappings_convert_string_to_decimal_with_rounding() {
2942        let spec = DecimalSpec::new(12, 2).expect("valid decimal spec");
2943        let mapping = DatatypeMapping {
2944            from: ColumnType::String,
2945            to: ColumnType::Decimal(spec.clone()),
2946            strategy: Some("round".to_string()),
2947            options: BTreeMap::new(),
2948        };
2949        let column = ColumnMeta {
2950            name: "amount".to_string(),
2951            datatype: ColumnType::Decimal(spec.clone()),
2952            rename: None,
2953            value_replacements: Vec::new(),
2954            datatype_mappings: vec![mapping],
2955        };
2956        let schema = Schema {
2957            columns: vec![column],
2958            schema_version: None,
2959            has_headers: true,
2960        };
2961        let mut row = vec!["123.455".to_string()];
2962        schema
2963            .apply_transformations_to_row(&mut row)
2964            .expect("apply decimal rounding mapping");
2965        assert_eq!(row[0], "123.46");
2966    }
2967
2968    #[test]
2969    fn datatype_mappings_convert_string_to_decimal_with_truncation() {
2970        let spec = DecimalSpec::new(14, 3).expect("valid decimal spec");
2971        let mapping = DatatypeMapping {
2972            from: ColumnType::String,
2973            to: ColumnType::Decimal(spec.clone()),
2974            strategy: Some("truncate".to_string()),
2975            options: BTreeMap::new(),
2976        };
2977        let column = ColumnMeta {
2978            name: "measurement".to_string(),
2979            datatype: ColumnType::Decimal(spec.clone()),
2980            rename: None,
2981            value_replacements: Vec::new(),
2982            datatype_mappings: vec![mapping],
2983        };
2984        let schema = Schema {
2985            columns: vec![column],
2986            schema_version: None,
2987            has_headers: true,
2988        };
2989        let mut row = vec!["-87.6549".to_string()];
2990        schema
2991            .apply_transformations_to_row(&mut row)
2992            .expect("apply decimal truncation mapping");
2993        assert_eq!(row[0], "-87.654");
2994    }
2995
2996    fn apply_grouping(value: &str, separator: char) -> String {
2997        let chars: Vec<char> = value.chars().collect();
2998        if chars.len() <= 3 {
2999            return value.to_string();
3000        }
3001        let mut grouped = String::new();
3002        let mut index = chars.len() % 3;
3003        if index == 0 {
3004            index = 3;
3005        }
3006        grouped.extend(&chars[..index]);
3007        while index < chars.len() {
3008            grouped.push(separator);
3009            grouped.extend(&chars[index..index + 3]);
3010            index += 3;
3011        }
3012        grouped
3013    }
3014
3015    fn digit_strategy() -> impl Strategy<Value = char> {
3016        (0u8..=9).prop_map(|d| (b'0' + d) as char)
3017    }
3018
3019    fn numeric_token_strategy() -> impl Strategy<Value = (String, u32, bool, bool)> {
3020        (
3021            1u64..=999_999,
3022            0u32..=4,
3023            any::<bool>(),
3024            any::<bool>(),
3025            any::<bool>(),
3026            prop_oneof![Just('$'), Just('€'), Just('£'), Just('¥')],
3027            proptest::option::of(prop_oneof![Just(','), Just('_'), Just(' ')]),
3028            any::<bool>(),
3029        )
3030            .prop_flat_map(
3031                |(
3032                    integer,
3033                    scale,
3034                    negative,
3035                    parentheses,
3036                    use_symbol,
3037                    symbol_char,
3038                    separator,
3039                    spaced,
3040                )| {
3041                    let fraction_strategy = if scale == 0 {
3042                        Just(String::new()).boxed()
3043                    } else {
3044                        proptest::collection::vec(digit_strategy(), scale as usize)
3045                            .prop_map(|digits| digits.into_iter().collect())
3046                            .boxed()
3047                    };
3048                    fraction_strategy.prop_map(move |fraction| {
3049                        let mut body = integer.to_string();
3050                        if let Some(sep) = separator {
3051                            body = apply_grouping(&body, sep);
3052                        }
3053                        if scale > 0 {
3054                            body.push('.');
3055                            body.push_str(&fraction);
3056                        }
3057                        let mut has_symbol = false;
3058                        if use_symbol {
3059                            has_symbol = true;
3060                            body = format!("{}{}", symbol_char, body);
3061                        }
3062                        let mut formatted = body;
3063                        let negative = negative && integer != 0;
3064                        let parentheses_active = parentheses && negative;
3065                        if negative {
3066                            if parentheses_active {
3067                                formatted = format!("({formatted})");
3068                            } else {
3069                                formatted = format!("-{formatted}");
3070                            }
3071                        }
3072                        if spaced {
3073                            formatted = format!(" {formatted} ");
3074                        }
3075                        (formatted, scale, has_symbol, parentheses_active)
3076                    })
3077                },
3078            )
3079    }
3080
3081    proptest! {
3082        #[test]
3083        fn analyze_numeric_token_handles_generated_numeric_formats(
3084            (token, scale, has_symbol, parentheses_active) in numeric_token_strategy()
3085        ) {
3086            let observation = super::analyze_numeric_token(&token)
3087                .expect("generated numeric token should classify");
3088            if scale > 0 {
3089                prop_assert_eq!(observation.kind, NumericKind::Decimal);
3090                prop_assert_eq!(observation.scale, scale);
3091            } else {
3092                prop_assert_eq!(observation.kind, NumericKind::Integer);
3093            }
3094            prop_assert_eq!(
3095                observation.has_currency_symbol,
3096                has_symbol || parentheses_active
3097            );
3098        }
3099    }
3100
3101    #[test]
3102    fn datatype_mappings_reject_unknown_currency_strategy() {
3103        let mut options = BTreeMap::new();
3104        options.insert("scale".to_string(), Value::from(2));
3105        let mapping = DatatypeMapping {
3106            from: ColumnType::String,
3107            to: ColumnType::Currency,
3108            strategy: Some("ceil".to_string()),
3109            options,
3110        };
3111        let column = ColumnMeta {
3112            name: "price".to_string(),
3113            datatype: ColumnType::Currency,
3114            rename: None,
3115            value_replacements: Vec::new(),
3116            datatype_mappings: vec![mapping],
3117        };
3118        let schema = Schema {
3119            columns: vec![column],
3120            schema_version: None,
3121            has_headers: true,
3122        };
3123        let mut row = vec!["12.34".to_string()];
3124        let err = schema
3125            .apply_transformations_to_row(&mut row)
3126            .expect_err("invalid currency strategy should fail");
3127        assert!(err.to_string().contains("Column 'price'"));
3128        assert!(err.chain().any(|source| {
3129            source
3130                .to_string()
3131                .contains("Unsupported currency rounding strategy")
3132        }));
3133    }
3134
3135    #[test]
3136    fn datatype_mappings_reject_decimal_precision_overflow() {
3137        let spec = DecimalSpec::new(8, 2).expect("decimal spec");
3138        let mapping = DatatypeMapping {
3139            from: ColumnType::String,
3140            to: ColumnType::Decimal(spec.clone()),
3141            strategy: None,
3142            options: BTreeMap::new(),
3143        };
3144        let column = ColumnMeta {
3145            name: "amount".to_string(),
3146            datatype: ColumnType::Decimal(spec.clone()),
3147            rename: None,
3148            value_replacements: Vec::new(),
3149            datatype_mappings: vec![mapping],
3150        };
3151        let schema = Schema {
3152            columns: vec![column],
3153            schema_version: None,
3154            has_headers: true,
3155        };
3156        let mut row = vec!["1234567.89".to_string()];
3157        let err = schema
3158            .apply_transformations_to_row(&mut row)
3159            .expect_err("precision overflow should fail");
3160        assert!(err.to_string().contains("Column 'amount'"));
3161        assert!(
3162            err.chain()
3163                .any(|source| source.to_string().contains("must not exceed"))
3164        );
3165    }
3166}