1use std::{borrow::Cow, fmt, fs::File, io::BufReader, path::Path};
2
3use anyhow::{Context, Result, anyhow};
4use encoding_rs::Encoding;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use uuid::Uuid;
8
9use crate::{
10 data::{parse_naive_date, parse_naive_datetime},
11 io_utils,
12};
13
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15pub enum ColumnType {
16 String,
17 Integer,
18 Float,
19 Boolean,
20 Date,
21 DateTime,
22 Guid,
23}
24
25impl ColumnType {
26 pub fn as_str(&self) -> &'static str {
27 match self {
28 ColumnType::String => "string",
29 ColumnType::Integer => "integer",
30 ColumnType::Float => "float",
31 ColumnType::Boolean => "boolean",
32 ColumnType::Date => "date",
33 ColumnType::DateTime => "datetime",
34 ColumnType::Guid => "guid",
35 }
36 }
37
38 pub fn variants() -> &'static [&'static str] {
39 &[
40 "string", "integer", "float", "boolean", "date", "datetime", "guid",
41 ]
42 }
43}
44
45impl fmt::Display for ColumnType {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 write!(f, "{}", self.as_str())
48 }
49}
50
51impl std::str::FromStr for ColumnType {
52 type Err = anyhow::Error;
53
54 fn from_str(value: &str) -> Result<Self, Self::Err> {
55 let normalized = value.trim().to_ascii_lowercase();
56 match normalized.as_str() {
57 "string" => Ok(ColumnType::String),
58 "integer" | "int" => Ok(ColumnType::Integer),
59 "float" | "double" => Ok(ColumnType::Float),
60 "boolean" | "bool" => Ok(ColumnType::Boolean),
61 "date" => Ok(ColumnType::Date),
62 "datetime" | "date-time" | "timestamp" => Ok(ColumnType::DateTime),
63 "guid" | "uuid" => Ok(ColumnType::Guid),
64 _ => Err(anyhow!(
65 "Unknown column type '{value}'. Supported types: {}",
66 ColumnType::variants().join(", ")
67 )),
68 }
69 }
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73pub struct ValueReplacement {
74 pub from: String,
75 pub to: String,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct ColumnMeta {
80 pub name: String,
81 pub datatype: ColumnType,
82 #[serde(
83 default,
84 skip_serializing_if = "Option::is_none",
85 rename = "name_mapping"
86 )]
87 pub rename: Option<String>,
88 #[serde(
89 default,
90 rename = "replace",
91 alias = "value_replacements",
92 skip_serializing_if = "Vec::is_empty"
93 )]
94 pub value_replacements: Vec<ValueReplacement>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct Schema {
99 pub columns: Vec<ColumnMeta>,
100}
101
102impl Schema {
103 pub fn from_headers(headers: &[String]) -> Self {
104 let columns = headers
105 .iter()
106 .map(|name| ColumnMeta {
107 name: name.clone(),
108 datatype: ColumnType::String,
109 rename: None,
110 value_replacements: Vec::new(),
111 })
112 .collect();
113 Schema { columns }
114 }
115
116 pub fn column_index(&self, name: &str) -> Option<usize> {
117 self.columns
118 .iter()
119 .position(|c| c.name == name || c.rename.as_deref() == Some(name))
120 }
121
122 pub fn headers(&self) -> Vec<String> {
123 self.columns.iter().map(|c| c.name.clone()).collect()
124 }
125
126 pub fn output_headers(&self) -> Vec<String> {
127 self.columns
128 .iter()
129 .map(|c| c.output_name().to_string())
130 .collect()
131 }
132
133 pub fn validate_headers(&self, headers: &[String]) -> Result<()> {
134 if headers.len() != self.columns.len() {
135 return Err(anyhow!(
136 "Header length mismatch: schema expects {} column(s) but file contains {}",
137 self.columns.len(),
138 headers.len()
139 ));
140 }
141 for (idx, column) in self.columns.iter().enumerate() {
142 let name = headers.get(idx).map(|s| s.as_str()).unwrap_or_default();
143 if name != column.name {
144 return Err(anyhow!(
145 "Header mismatch at position {}: expected '{}' but found '{}'",
146 idx + 1,
147 column.name,
148 name
149 ));
150 }
151 }
152 Ok(())
153 }
154
155 pub fn save(&self, path: &Path) -> Result<()> {
156 self.save_internal(path, false)
157 }
158
159 pub fn save_with_replace_template(&self, path: &Path) -> Result<()> {
160 self.save_internal(path, true)
161 }
162
163 pub fn load(path: &Path) -> Result<Self> {
164 let file = File::open(path).with_context(|| format!("Opening schema file {path:?}"))?;
165 let reader = BufReader::new(file);
166 let schema = serde_json::from_reader(reader).context("Parsing schema JSON")?;
167 Ok(schema)
168 }
169
170 fn save_internal(&self, path: &Path, include_replace_template: bool) -> Result<()> {
171 let file = File::create(path).with_context(|| format!("Creating schema file {path:?}"))?;
172 if !include_replace_template {
173 serde_json::to_writer_pretty(file, self).context("Writing schema JSON")
174 } else {
175 let mut value =
176 serde_json::to_value(self).context("Serializing schema to JSON value")?;
177 if let Some(columns) = value
178 .get_mut("columns")
179 .and_then(|columns| columns.as_array_mut())
180 {
181 for column in columns {
182 if let Some(obj) = column.as_object_mut() {
183 if let Some(existing) = obj.remove("value_replacements") {
184 obj.insert("replace".to_string(), existing);
185 }
186 obj.entry("replace".to_string())
187 .or_insert_with(|| Value::Array(Vec::new()));
188 }
189 }
190 }
191 serde_json::to_writer_pretty(file, &value).context("Writing schema JSON")
192 }
193 }
194}
195
196#[derive(Debug, Clone)]
197struct TypeCandidate {
198 possible_integer: bool,
199 possible_float: bool,
200 possible_boolean: bool,
201 possible_date: bool,
202 possible_datetime: bool,
203 possible_guid: bool,
204}
205
206impl TypeCandidate {
207 fn new() -> Self {
208 Self {
209 possible_integer: true,
210 possible_float: true,
211 possible_boolean: true,
212 possible_date: true,
213 possible_datetime: true,
214 possible_guid: true,
215 }
216 }
217
218 fn update(&mut self, value: &str) {
219 if self.possible_boolean
220 && !matches!(
221 value.to_ascii_lowercase().as_str(),
222 "true" | "false" | "t" | "f" | "yes" | "no" | "y" | "n"
223 )
224 {
225 self.possible_boolean = false;
226 }
227 if self.possible_integer && value.parse::<i64>().is_err() {
228 self.possible_integer = false;
229 }
230 if self.possible_float && value.parse::<f64>().is_err() {
231 self.possible_float = false;
232 }
233 if self.possible_date && parse_naive_date(value).is_err() {
234 self.possible_date = false;
235 }
236 if self.possible_datetime && parse_naive_datetime(value).is_err() {
237 self.possible_datetime = false;
238 }
239 if self.possible_guid {
240 let trimmed = value.trim().trim_matches(|c| matches!(c, '{' | '}'));
241 if Uuid::parse_str(trimmed).is_err() {
242 self.possible_guid = false;
243 }
244 }
245 }
246
247 fn decide(&self) -> ColumnType {
248 if self.possible_boolean {
249 ColumnType::Boolean
250 } else if self.possible_integer {
251 ColumnType::Integer
252 } else if self.possible_float {
253 ColumnType::Float
254 } else if self.possible_date {
255 ColumnType::Date
256 } else if self.possible_datetime {
257 ColumnType::DateTime
258 } else if self.possible_guid {
259 ColumnType::Guid
260 } else {
261 ColumnType::String
262 }
263 }
264}
265
266pub fn infer_schema(
267 path: &Path,
268 sample_rows: usize,
269 delimiter: u8,
270 encoding: &'static Encoding,
271) -> Result<Schema> {
272 let mut reader = io_utils::open_csv_reader_from_path(path, delimiter, true)?;
273 let header_record = reader.byte_headers()?.clone();
274 let headers = io_utils::decode_headers(&header_record, encoding)?;
275 let mut candidates = vec![TypeCandidate::new(); headers.len()];
276
277 let mut record = csv::ByteRecord::new();
278 let mut processed = 0usize;
279 while reader.read_byte_record(&mut record)? {
280 if sample_rows > 0 && processed >= sample_rows {
281 break;
282 }
283 for (idx, field) in record.iter().enumerate() {
284 if field.is_empty() {
285 continue;
286 }
287 let decoded = io_utils::decode_bytes(field, encoding)?;
288 candidates[idx].update(&decoded);
289 }
290 processed += 1;
291 }
292
293 let columns = headers
294 .iter()
295 .enumerate()
296 .map(|(idx, header)| ColumnMeta {
297 name: header.clone(),
298 datatype: candidates[idx].decide(),
299 rename: None,
300 value_replacements: Vec::new(),
301 })
302 .collect();
303
304 Ok(Schema { columns })
305}
306
307impl ColumnMeta {
308 pub fn output_name(&self) -> &str {
309 self.rename
310 .as_deref()
311 .filter(|value| !value.is_empty())
312 .unwrap_or(&self.name)
313 }
314
315 pub fn normalize_value<'a>(&self, value: &'a str) -> Cow<'a, str> {
316 for replacement in &self.value_replacements {
317 if value == replacement.from {
318 return Cow::Owned(replacement.to.clone());
319 }
320 }
321 Cow::Borrowed(value)
322 }
323}
324
325impl Schema {
326 pub fn apply_replacements_to_row(&self, row: &mut [String]) {
327 for (idx, column) in self.columns.iter().enumerate() {
328 if let Some(value) = row.get_mut(idx)
329 && let Cow::Owned(normalized) = column.normalize_value(value)
330 {
331 *value = normalized;
332 }
333 }
334 }
335}