use std::collections::HashMap;
use std::io::Cursor;
use crate::core::column_inference::{infer_column_types, FieldValueType, InferenceConfig};
use crate::core::field_value::{DataRow, DataTable, FieldValue};
pub fn parse_csv(text: &str) -> Result<(DataTable, Vec<String>), String> {
let mut lines = text
.lines()
.filter(|l| !l.trim_start().starts_with('#'))
.peekable();
let header_line = lines.next().ok_or("CSV file is empty")?;
let columns: Vec<String> = split_csv_line(header_line)
.into_iter()
.map(|s| s.trim().trim_matches('"').to_string())
.collect();
if columns.is_empty() {
return Err("CSV header is empty".into());
}
let mut rows: Vec<DataRow> = Vec::new();
for line in lines {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let fields = split_csv_line(trimmed);
let mut row: DataRow = HashMap::new();
for (i, col) in columns.iter().enumerate() {
let raw = fields.get(i).map(|s| s.trim()).unwrap_or("");
row.insert(col.clone(), infer_value(raw));
}
rows.push(row);
}
Ok((DataTable::new(rows), columns))
}
fn split_csv_line(line: &str) -> Vec<String> {
let mut fields = Vec::new();
let mut cur = String::new();
let mut in_quotes = false;
let mut chars = line.chars().peekable();
while let Some(c) = chars.next() {
match c {
'"' if in_quotes => {
if chars.peek() == Some(&'"') {
chars.next(); cur.push('"');
} else {
in_quotes = false;
}
}
'"' => in_quotes = true,
',' if !in_quotes => {
fields.push(cur.clone());
cur.clear();
}
other => cur.push(other),
}
}
fields.push(cur);
fields
}
fn infer_value(s: &str) -> FieldValue {
if s.is_empty() || s.eq_ignore_ascii_case("null") || s.eq_ignore_ascii_case("na") {
return FieldValue::Null;
}
if let Ok(n) = s.parse::<f64>() {
return FieldValue::Numeric(n);
}
if s.eq_ignore_ascii_case("true") {
return FieldValue::Bool(true);
}
if s.eq_ignore_ascii_case("false") {
return FieldValue::Bool(false);
}
FieldValue::Text(s.to_string())
}
pub fn parse_csv_with_inference(
text: &str,
) -> Result<
(
DataTable,
Vec<String>,
Vec<crate::core::column_inference::InferredColumnType>,
),
String,
> {
let mut lines = text
.lines()
.filter(|l| !l.trim_start().starts_with('#'))
.peekable();
let header_line = lines.next().ok_or("CSV file is empty")?;
let columns: Vec<String> = split_csv_line(header_line)
.into_iter()
.map(|s| s.trim().trim_matches('"').to_string())
.collect();
if columns.is_empty() {
return Err("CSV header is empty".into());
}
let mut text_rows: Vec<DataRow> = Vec::new();
for line in lines {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let fields = split_csv_line(trimmed);
let mut row: DataRow = HashMap::new();
for (i, col) in columns.iter().enumerate() {
let raw = fields.get(i).map(|s| s.trim()).unwrap_or("");
row.insert(col.clone(), infer_value(raw));
}
text_rows.push(row);
}
let text_table = DataTable::new(text_rows.clone());
let config = InferenceConfig::default();
let inferred_types = infer_column_types(&text_table, &columns, config);
let mut typed_rows: Vec<DataRow> = Vec::new();
for text_row in text_rows {
let mut typed_row: DataRow = HashMap::new();
for (col_idx, col_name) in columns.iter().enumerate() {
let raw_value = text_row.get(col_name);
let inferred = &inferred_types[col_idx];
let typed_value = if let Some(FieldValue::Text(s)) = raw_value {
parse_with_type_hint(s, inferred.storage_type)
} else if let Some(val) = raw_value {
val.clone()
} else {
FieldValue::Null
};
typed_row.insert(col_name.clone(), typed_value);
}
typed_rows.push(typed_row);
}
Ok((DataTable::new(typed_rows), columns, inferred_types))
}
fn parse_with_type_hint(s: &str, storage_type: FieldValueType) -> FieldValue {
if s.is_empty() || s.eq_ignore_ascii_case("null") || s.eq_ignore_ascii_case("na") {
return FieldValue::Null;
}
match storage_type {
FieldValueType::Timestamp => {
if let Some(ts) = parse_date_string(s) {
FieldValue::Timestamp(ts)
} else {
FieldValue::Text(s.to_string())
}
}
FieldValueType::Numeric => {
if let Ok(n) = s.parse::<f64>() {
FieldValue::Numeric(n)
} else {
FieldValue::Text(s.to_string())
}
}
FieldValueType::Boolean => {
if s.eq_ignore_ascii_case("true") {
FieldValue::Bool(true)
} else if s.eq_ignore_ascii_case("false") {
FieldValue::Bool(false)
} else {
FieldValue::Text(s.to_string())
}
}
FieldValueType::Text => FieldValue::Text(s.to_string()),
}
}
fn parse_date_string(s: &str) -> Option<f64> {
let s = s.trim();
if let Ok(n) = s.parse::<f64>() {
if n > 1e10 {
return Some(n);
} else {
return Some(n * 1000.0);
}
}
if s.contains('T') || (s.contains('-') && s.contains(':')) {
return parse_datetime_format(s);
}
if s.contains('-') || s.contains('/') {
return parse_date_only_format(s);
}
None
}
fn parse_datetime_format(s: &str) -> Option<f64> {
let parts: Vec<&str> = if s.contains('T') {
s.split('T').collect()
} else {
s.split(' ').collect()
};
if parts.len() != 2 {
return None;
}
let date_part = parts[0];
let time_part = parts[1];
let date_ms = parse_date_only_format(date_part)?;
let time_ms = parse_time_part(time_part)?;
Some(date_ms + time_ms)
}
fn parse_time_part(s: &str) -> Option<f64> {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() < 2 || parts.len() > 3 {
return None;
}
let hours: f64 = parts[0].parse().ok()?;
let minutes: f64 = parts[1].parse().ok()?;
let seconds: f64 = if parts.len() == 3 {
parts[2].parse().ok()?
} else {
0.0
};
Some((hours * 3600.0 + minutes * 60.0 + seconds) * 1000.0)
}
fn parse_date_only_format(s: &str) -> Option<f64> {
let sep = if s.contains('-') {
'-'
} else if s.contains('/') {
'/'
} else {
return None;
};
let parts: Vec<&str> = s.split(sep).collect();
if parts.len() != 3 {
return None;
}
let (year, month, day) = detect_date_parts(parts[0], parts[1], parts[2])?;
let days = days_since_epoch(year, month, day)?;
Some(days as f64 * 86_400_000.0)
}
fn detect_date_parts(p0: &str, p1: &str, p2: &str) -> Option<(i32, i32, i32)> {
let v0: i32 = p0.parse().ok()?;
let v1: i32 = p1.parse().ok()?;
let v2: i32 = p2.parse().ok()?;
if p0.len() == 4 {
return Some((v0, v1, v2));
}
if p2.len() == 4 {
if v0 > 12 {
return Some((v2, v1, v0)); }
if v1 > 12 {
return Some((v2, v0, v1)); }
return Some((v2, v0, v1));
}
None
}
fn days_since_epoch(year: i32, month: i32, day: i32) -> Option<i64> {
if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
return None;
}
let days_in_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
let is_leap = (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0);
let max_day = if month == 2 && is_leap {
29
} else {
days_in_month[(month - 1) as usize]
};
if day > max_day {
return None;
}
let mut total_days = 0i64;
for y in 1970..year {
let year_days = if (y % 4 == 0 && y % 100 != 0) || (y % 400 == 0) {
366
} else {
365
};
total_days += year_days;
}
for m in 1..month {
total_days += days_in_month[(m - 1) as usize] as i64;
}
if month > 2 && is_leap {
total_days += 1;
}
total_days += day as i64 - 1;
Some(total_days)
}
pub fn parse_arrow_ipc(bytes: &[u8]) -> Result<(DataTable, Vec<String>), String> {
use arrow::ipc::reader::FileReader;
let cursor = Cursor::new(bytes);
let reader = FileReader::try_new(cursor, None).map_err(|e| e.to_string())?;
let columns: Vec<String> = reader
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect();
let mut rows: Vec<DataRow> = Vec::new();
for batch_result in reader {
let batch = batch_result.map_err(|e| e.to_string())?;
rows.extend(record_batch_to_rows(&batch));
}
Ok((DataTable::new(rows), columns))
}
pub fn parse_parquet(bytes: &[u8]) -> Result<(DataTable, Vec<String>), String> {
use bytes::Bytes;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
let buf = Bytes::copy_from_slice(bytes);
let builder = ParquetRecordBatchReaderBuilder::try_new(buf).map_err(|e| e.to_string())?;
let columns: Vec<String> = builder
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect();
let reader = builder.build().map_err(|e| e.to_string())?;
let mut rows: Vec<DataRow> = Vec::new();
for batch_result in reader {
let batch = batch_result.map_err(|e| e.to_string())?;
rows.extend(record_batch_to_rows(&batch));
}
Ok((DataTable::new(rows), columns))
}
fn record_batch_to_rows(batch: &arrow::record_batch::RecordBatch) -> Vec<DataRow> {
let schema = batch.schema();
(0..batch.num_rows())
.map(|row_idx| {
let mut row: DataRow = HashMap::new();
for (col_idx, field) in schema.fields().iter().enumerate() {
let col = batch.column(col_idx);
row.insert(
field.name().clone(),
arrow_to_field_value(col.as_ref(), row_idx),
);
}
row
})
.collect()
}
fn arrow_to_field_value(array: &dyn arrow::array::Array, idx: usize) -> FieldValue {
use arrow::array::*;
use arrow::datatypes::{DataType, TimeUnit};
if array.is_null(idx) {
return FieldValue::Null;
}
macro_rules! as_numeric {
($arr_ty:ty) => {
FieldValue::Numeric(
array
.as_any()
.downcast_ref::<$arr_ty>()
.expect("downcast")
.value(idx) as f64,
)
};
}
match array.data_type() {
DataType::Int8 => as_numeric!(Int8Array),
DataType::Int16 => as_numeric!(Int16Array),
DataType::Int32 => as_numeric!(Int32Array),
DataType::Int64 => as_numeric!(Int64Array),
DataType::UInt8 => as_numeric!(UInt8Array),
DataType::UInt16 => as_numeric!(UInt16Array),
DataType::UInt32 => as_numeric!(UInt32Array),
DataType::UInt64 => as_numeric!(UInt64Array),
DataType::Float32 => as_numeric!(Float32Array),
DataType::Float64 => as_numeric!(Float64Array),
DataType::Boolean => FieldValue::Bool(
array
.as_any()
.downcast_ref::<BooleanArray>()
.expect("downcast")
.value(idx),
),
DataType::Utf8 => FieldValue::Text(
array
.as_any()
.downcast_ref::<StringArray>()
.expect("downcast")
.value(idx)
.to_string(),
),
DataType::LargeUtf8 => FieldValue::Text(
array
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("downcast")
.value(idx)
.to_string(),
),
DataType::Date32 => FieldValue::Timestamp(
array
.as_any()
.downcast_ref::<Date32Array>()
.expect("downcast")
.value(idx) as f64
* 86_400_000.0,
),
DataType::Date64 => FieldValue::Timestamp(
array
.as_any()
.downcast_ref::<Date64Array>()
.expect("downcast")
.value(idx) as f64,
),
DataType::Timestamp(TimeUnit::Nanosecond, _) => FieldValue::Timestamp(
array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.expect("downcast")
.value(idx) as f64
/ 1_000_000.0,
),
DataType::Timestamp(TimeUnit::Microsecond, _) => FieldValue::Timestamp(
array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.expect("downcast")
.value(idx) as f64
/ 1_000.0,
),
DataType::Timestamp(TimeUnit::Millisecond, _) => FieldValue::Timestamp(
array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.expect("downcast")
.value(idx) as f64,
),
DataType::Timestamp(TimeUnit::Second, _) => FieldValue::Timestamp(
array
.as_any()
.downcast_ref::<TimestampSecondArray>()
.expect("downcast")
.value(idx) as f64
* 1_000.0,
),
other => FieldValue::Text(format!("[{other}]")),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_date_string_iso8601() {
let ts = parse_date_string("2024-01-15").expect("should parse");
assert_eq!(ts, 1_705_276_800_000.0);
}
#[test]
fn test_parse_date_string_with_time() {
let ts = parse_date_string("2024-01-15T10:30:00").expect("should parse");
assert_eq!(ts, 1_705_276_800_000.0 + 37_800_000.0);
}
#[test]
fn test_parse_date_string_slash_format() {
let ts = parse_date_string("01/15/2024").expect("should parse");
assert_eq!(ts, 1_705_276_800_000.0);
}
#[test]
fn test_parse_date_string_european_format() {
let ts = parse_date_string("15-01-2024").expect("should parse");
assert_eq!(ts, 1_705_276_800_000.0);
}
#[test]
fn test_parse_date_string_unix_timestamp() {
let ts = parse_date_string("1705276800").expect("should parse");
assert_eq!(ts, 1_705_276_800_000.0);
let ts = parse_date_string("1705276800000").expect("should parse");
assert_eq!(ts, 1_705_276_800_000.0);
}
#[test]
fn test_parse_date_string_invalid() {
assert!(parse_date_string("not-a-date").is_none());
assert!(parse_date_string("revenue").is_none());
assert!(parse_date_string("2024-13-01").is_none()); assert!(parse_date_string("2024-01-32").is_none()); }
#[test]
fn test_parse_csv_with_inference_date_column() {
let csv = "\
date,value
2024-01-15,100
2024-01-16,200
2024-01-17,150";
let (table, columns, inferred) = parse_csv_with_inference(csv).expect("should parse");
assert_eq!(columns, vec!["date", "value"]);
assert_eq!(inferred.len(), 2);
assert_eq!(inferred[0].storage_type, FieldValueType::Timestamp);
assert_eq!(inferred[1].storage_type, FieldValueType::Numeric);
assert_eq!(table.len(), 3);
let row0 = &table.rows()[0];
assert!(matches!(row0.get("date"), Some(FieldValue::Timestamp(_))));
assert!(matches!(
row0.get("value"),
Some(FieldValue::Numeric(100.0))
));
}
#[test]
fn test_parse_csv_with_inference_mixed_formats() {
let csv = "\
timestamp,category,amount
2024-01-15T10:30:00,Sales,1250.50
2024-01-16 09:00:00,Marketing,980.00
2024-01-17T14:45:00,Sales,1450.75";
let (table, columns, inferred) = parse_csv_with_inference(csv).expect("should parse");
assert_eq!(columns, vec!["timestamp", "category", "amount"]);
assert_eq!(inferred[0].storage_type, FieldValueType::Timestamp);
assert_eq!(inferred[1].storage_type, FieldValueType::Text);
assert_eq!(inferred[2].storage_type, FieldValueType::Numeric);
let row0 = &table.rows()[0];
assert!(matches!(
row0.get("timestamp"),
Some(FieldValue::Timestamp(_))
));
assert!(matches!(row0.get("category"), Some(FieldValue::Text(_))));
assert!(matches!(
row0.get("amount"),
Some(FieldValue::Numeric(1250.50))
));
}
#[test]
fn test_days_since_epoch() {
assert_eq!(days_since_epoch(1970, 1, 1).unwrap(), 0);
assert_eq!(days_since_epoch(1970, 1, 2).unwrap(), 1);
let expected = 10_957; assert_eq!(days_since_epoch(2000, 1, 1).unwrap(), expected);
}
#[test]
fn test_parse_date_leap_year() {
let ts = parse_date_string("2024-02-29").expect("should parse leap day");
assert!(ts > 0.0);
assert!(parse_date_string("2023-02-29").is_none());
}
}