csv_managed/
schema.rs

1use std::{borrow::Cow, fmt, fs::File, io::BufReader, path::Path};
2
3use anyhow::{Context, Result, anyhow};
4use encoding_rs::Encoding;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use uuid::Uuid;
8
9use crate::{
10    data::{parse_naive_date, parse_naive_datetime},
11    io_utils,
12};
13
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15pub enum ColumnType {
16    String,
17    Integer,
18    Float,
19    Boolean,
20    Date,
21    DateTime,
22    Guid,
23}
24
25impl ColumnType {
26    pub fn as_str(&self) -> &'static str {
27        match self {
28            ColumnType::String => "string",
29            ColumnType::Integer => "integer",
30            ColumnType::Float => "float",
31            ColumnType::Boolean => "boolean",
32            ColumnType::Date => "date",
33            ColumnType::DateTime => "datetime",
34            ColumnType::Guid => "guid",
35        }
36    }
37
38    pub fn variants() -> &'static [&'static str] {
39        &[
40            "string", "integer", "float", "boolean", "date", "datetime", "guid",
41        ]
42    }
43}
44
45impl fmt::Display for ColumnType {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        write!(f, "{}", self.as_str())
48    }
49}
50
51impl std::str::FromStr for ColumnType {
52    type Err = anyhow::Error;
53
54    fn from_str(value: &str) -> Result<Self, Self::Err> {
55        let normalized = value.trim().to_ascii_lowercase();
56        match normalized.as_str() {
57            "string" => Ok(ColumnType::String),
58            "integer" | "int" => Ok(ColumnType::Integer),
59            "float" | "double" => Ok(ColumnType::Float),
60            "boolean" | "bool" => Ok(ColumnType::Boolean),
61            "date" => Ok(ColumnType::Date),
62            "datetime" | "date-time" | "timestamp" => Ok(ColumnType::DateTime),
63            "guid" | "uuid" => Ok(ColumnType::Guid),
64            _ => Err(anyhow!(
65                "Unknown column type '{value}'. Supported types: {}",
66                ColumnType::variants().join(", ")
67            )),
68        }
69    }
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73pub struct ValueReplacement {
74    pub from: String,
75    pub to: String,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct ColumnMeta {
80    pub name: String,
81    pub datatype: ColumnType,
82    #[serde(
83        default,
84        skip_serializing_if = "Option::is_none",
85        rename = "name_mapping"
86    )]
87    pub rename: Option<String>,
88    #[serde(
89        default,
90        rename = "replace",
91        alias = "value_replacements",
92        skip_serializing_if = "Vec::is_empty"
93    )]
94    pub value_replacements: Vec<ValueReplacement>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct Schema {
99    pub columns: Vec<ColumnMeta>,
100}
101
102impl Schema {
103    pub fn from_headers(headers: &[String]) -> Self {
104        let columns = headers
105            .iter()
106            .map(|name| ColumnMeta {
107                name: name.clone(),
108                datatype: ColumnType::String,
109                rename: None,
110                value_replacements: Vec::new(),
111            })
112            .collect();
113        Schema { columns }
114    }
115
116    pub fn column_index(&self, name: &str) -> Option<usize> {
117        self.columns
118            .iter()
119            .position(|c| c.name == name || c.rename.as_deref() == Some(name))
120    }
121
122    pub fn headers(&self) -> Vec<String> {
123        self.columns.iter().map(|c| c.name.clone()).collect()
124    }
125
126    pub fn output_headers(&self) -> Vec<String> {
127        self.columns
128            .iter()
129            .map(|c| c.output_name().to_string())
130            .collect()
131    }
132
133    pub fn validate_headers(&self, headers: &[String]) -> Result<()> {
134        if headers.len() != self.columns.len() {
135            return Err(anyhow!(
136                "Header length mismatch: schema expects {} column(s) but file contains {}",
137                self.columns.len(),
138                headers.len()
139            ));
140        }
141        for (idx, column) in self.columns.iter().enumerate() {
142            let name = headers.get(idx).map(|s| s.as_str()).unwrap_or_default();
143            if name != column.name {
144                return Err(anyhow!(
145                    "Header mismatch at position {}: expected '{}' but found '{}'",
146                    idx + 1,
147                    column.name,
148                    name
149                ));
150            }
151        }
152        Ok(())
153    }
154
155    pub fn save(&self, path: &Path) -> Result<()> {
156        self.save_internal(path, false)
157    }
158
159    pub fn save_with_replace_template(&self, path: &Path) -> Result<()> {
160        self.save_internal(path, true)
161    }
162
163    pub fn load(path: &Path) -> Result<Self> {
164        let file = File::open(path).with_context(|| format!("Opening schema file {path:?}"))?;
165        let reader = BufReader::new(file);
166        let schema = serde_json::from_reader(reader).context("Parsing schema JSON")?;
167        Ok(schema)
168    }
169
170    fn save_internal(&self, path: &Path, include_replace_template: bool) -> Result<()> {
171        let file = File::create(path).with_context(|| format!("Creating schema file {path:?}"))?;
172        if !include_replace_template {
173            serde_json::to_writer_pretty(file, self).context("Writing schema JSON")
174        } else {
175            let mut value =
176                serde_json::to_value(self).context("Serializing schema to JSON value")?;
177            if let Some(columns) = value
178                .get_mut("columns")
179                .and_then(|columns| columns.as_array_mut())
180            {
181                for column in columns {
182                    if let Some(obj) = column.as_object_mut() {
183                        if let Some(existing) = obj.remove("value_replacements") {
184                            obj.insert("replace".to_string(), existing);
185                        }
186                        obj.entry("replace".to_string())
187                            .or_insert_with(|| Value::Array(Vec::new()));
188                    }
189                }
190            }
191            serde_json::to_writer_pretty(file, &value).context("Writing schema JSON")
192        }
193    }
194}
195
196#[derive(Debug, Clone)]
197struct TypeCandidate {
198    possible_integer: bool,
199    possible_float: bool,
200    possible_boolean: bool,
201    possible_date: bool,
202    possible_datetime: bool,
203    possible_guid: bool,
204}
205
206impl TypeCandidate {
207    fn new() -> Self {
208        Self {
209            possible_integer: true,
210            possible_float: true,
211            possible_boolean: true,
212            possible_date: true,
213            possible_datetime: true,
214            possible_guid: true,
215        }
216    }
217
218    fn update(&mut self, value: &str) {
219        if self.possible_boolean
220            && !matches!(
221                value.to_ascii_lowercase().as_str(),
222                "true" | "false" | "t" | "f" | "yes" | "no" | "y" | "n"
223            )
224        {
225            self.possible_boolean = false;
226        }
227        if self.possible_integer && value.parse::<i64>().is_err() {
228            self.possible_integer = false;
229        }
230        if self.possible_float && value.parse::<f64>().is_err() {
231            self.possible_float = false;
232        }
233        if self.possible_date && parse_naive_date(value).is_err() {
234            self.possible_date = false;
235        }
236        if self.possible_datetime && parse_naive_datetime(value).is_err() {
237            self.possible_datetime = false;
238        }
239        if self.possible_guid {
240            let trimmed = value.trim().trim_matches(|c| matches!(c, '{' | '}'));
241            if Uuid::parse_str(trimmed).is_err() {
242                self.possible_guid = false;
243            }
244        }
245    }
246
247    fn decide(&self) -> ColumnType {
248        if self.possible_boolean {
249            ColumnType::Boolean
250        } else if self.possible_integer {
251            ColumnType::Integer
252        } else if self.possible_float {
253            ColumnType::Float
254        } else if self.possible_date {
255            ColumnType::Date
256        } else if self.possible_datetime {
257            ColumnType::DateTime
258        } else if self.possible_guid {
259            ColumnType::Guid
260        } else {
261            ColumnType::String
262        }
263    }
264}
265
266pub fn infer_schema(
267    path: &Path,
268    sample_rows: usize,
269    delimiter: u8,
270    encoding: &'static Encoding,
271) -> Result<Schema> {
272    let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, true)?;
273    let header_record = reader.byte_headers()?.clone();
274    let headers = io_utils::decode_headers(&header_record, encoding)?;
275    let mut candidates = vec![TypeCandidate::new(); headers.len()];
276
277    let mut record = csv::ByteRecord::new();
278    let mut processed = 0usize;
279    while reader.read_byte_record(&mut record)? {
280        if sample_rows > 0 && processed >= sample_rows {
281            break;
282        }
283        for (idx, field) in record.iter().enumerate() {
284            if field.is_empty() {
285                continue;
286            }
287            let decoded = io_utils::decode_bytes(field, encoding)?;
288            candidates[idx].update(&decoded);
289        }
290        processed += 1;
291    }
292
293    let columns = headers
294        .iter()
295        .enumerate()
296        .map(|(idx, header)| ColumnMeta {
297            name: header.clone(),
298            datatype: candidates[idx].decide(),
299            rename: None,
300            value_replacements: Vec::new(),
301        })
302        .collect();
303
304    Ok(Schema { columns })
305}
306
307impl ColumnMeta {
308    pub fn output_name(&self) -> &str {
309        self.rename
310            .as_deref()
311            .filter(|value| !value.is_empty())
312            .unwrap_or(&self.name)
313    }
314
315    pub fn normalize_value<'a>(&self, value: &'a str) -> Cow<'a, str> {
316        for replacement in &self.value_replacements {
317            if value == replacement.from {
318                return Cow::Owned(replacement.to.clone());
319            }
320        }
321        Cow::Borrowed(value)
322    }
323}
324
325impl Schema {
326    pub fn apply_replacements_to_row(&self, row: &mut [String]) {
327        for (idx, column) in self.columns.iter().enumerate() {
328            if let Some(value) = row.get_mut(idx)
329                && let Cow::Owned(normalized) = column.normalize_value(value)
330            {
331                *value = normalized;
332            }
333        }
334    }
335}