use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use chrono::{DateTime, NaiveDate, NaiveDateTime};
use csv::{ReaderBuilder, StringRecord, WriterBuilder};
use crate::error::{Error, Result};
use crate::table::{
Bitmap, BooleanCol, Column, DataType, DictionaryCol, Field, PrimitiveCol, Schema, Table,
Utf8Col,
};
const DATE32_FORMAT: &str = "%Y-%m-%d";
const TIMESTAMP_FORMATS: [&str; 6] = [
"%Y-%m-%d %H:%M:%S%.f",
"%Y-%m-%dT%H:%M:%S%.f",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%dT%H:%M",
];
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SchemaMode {
Strict,
Infer,
InferThenPromote,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum StringEncoding {
Auto,
Utf8,
Dictionary,
}
#[derive(Clone, Debug)]
pub struct CsvReadOptions {
pub delimiter: u8,
pub quote: u8,
pub has_header: bool,
pub infer_rows: usize,
pub nulls: Vec<String>,
pub schema: Option<Schema>,
pub schema_mode: SchemaMode,
pub string_encoding: StringEncoding,
pub date_inference: bool,
pub timestamp_inference: bool,
}
impl Default for CsvReadOptions {
fn default() -> Self {
Self {
delimiter: b',',
quote: b'"',
has_header: true,
infer_rows: 16_384,
nulls: vec!["".to_string(), "NULL".to_string(), "null".to_string()],
schema: None,
schema_mode: SchemaMode::InferThenPromote,
string_encoding: StringEncoding::Auto,
date_inference: false,
timestamp_inference: false,
}
}
}
#[derive(Clone, Debug)]
pub struct CsvWriteOptions {
pub delimiter: u8,
pub include_header: bool,
pub null_token: String,
}
impl Default for CsvWriteOptions {
fn default() -> Self {
Self {
delimiter: b',',
include_header: true,
null_token: String::new(),
}
}
}
pub fn read_path(path: impl AsRef<Path>, options: &CsvReadOptions) -> Result<Table> {
let path = path.as_ref();
let mut plan = infer_plan(path, options)?;
if plan.fields.is_empty() {
return Ok(Table::empty());
}
let allow_promotion =
options.schema.is_none() && matches!(options.schema_mode, SchemaMode::InferThenPromote);
let mut attempts_remaining = plan.fields.len();
loop {
match build_table(
path,
options,
&plan,
options.schema.is_some(),
allow_promotion,
) {
Ok(table) => return Ok(table),
Err(BuildFailure::Promote(column_index))
if allow_promotion && attempts_remaining > 0 =>
{
plan.fields[column_index].dtype = plan.fallback_string_types[column_index];
attempts_remaining -= 1;
}
Err(BuildFailure::Promote(column_index)) => {
return Err(Error::Parse(format!(
"failed to parse column '{}' as {}; promotion budget exhausted",
plan.fields[column_index].name, plan.fields[column_index].dtype
)))
}
Err(BuildFailure::Fatal(error)) => return Err(error),
}
}
}
pub fn write_path(table: &Table, path: impl AsRef<Path>, options: &CsvWriteOptions) -> Result<()> {
let path = path.as_ref();
let table = table.materialize()?;
let mut writer = WriterBuilder::new()
.delimiter(options.delimiter)
.from_path(path)?;
if options.include_header {
let headers = table
.schema()
.fields()
.iter()
.map(|field| field.name.as_ref())
.collect::<Vec<_>>();
writer.write_record(headers)?;
}
for row_index in 0..table.nrows() {
let row = table
.columns()
.iter()
.map(|column| stringify_cell(column, row_index as u32, &options.null_token))
.collect::<Vec<_>>();
writer.write_record(row)?;
}
writer.flush()?;
Ok(())
}
#[derive(Clone, Debug)]
struct BuildPlan {
fields: Vec<Field>,
fallback_string_types: Vec<DataType>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
enum InferredKind {
Bool,
I64,
F64,
Date32,
TimestampMs,
Utf8,
}
#[derive(Debug)]
struct InferColumn {
kind: Option<InferredKind>,
nullable: bool,
non_null_count: usize,
unique_values: HashSet<String>,
}
impl InferColumn {
fn new() -> Self {
Self {
kind: None,
nullable: false,
non_null_count: 0,
unique_values: HashSet::new(),
}
}
fn observe(&mut self, raw: &str, options: &CsvReadOptions) {
if is_null_token(raw, &options.nulls) {
self.nullable = true;
return;
}
let trimmed = raw.trim();
self.non_null_count += 1;
if self.unique_values.len() <= 65_536 {
self.unique_values.insert(raw.to_string());
}
let observed_kind = infer_scalar_kind(trimmed, options);
self.kind = Some(match self.kind {
Some(current) => current.max(observed_kind),
None => observed_kind,
});
}
fn inferred_dtype(&self, string_encoding: &StringEncoding) -> DataType {
match self.kind {
Some(InferredKind::Bool) => DataType::Bool,
Some(InferredKind::I64) => DataType::I64,
Some(InferredKind::F64) => DataType::F64,
Some(InferredKind::Date32) => DataType::Date32,
Some(InferredKind::TimestampMs) => DataType::TimestampMs,
Some(InferredKind::Utf8) | None => self.string_fallback_dtype(string_encoding),
}
}
fn string_fallback_dtype(&self, string_encoding: &StringEncoding) -> DataType {
match string_encoding {
StringEncoding::Utf8 => DataType::Utf8,
StringEncoding::Dictionary => DataType::DictUtf8,
StringEncoding::Auto => {
if self.non_null_count > 0 {
let unique = self.unique_values.len();
let ratio = unique as f64 / self.non_null_count as f64;
if unique <= u16::MAX as usize && ratio <= 0.20 {
return DataType::DictUtf8;
}
}
DataType::Utf8
}
}
}
}
enum BuildFailure {
Fatal(Error),
Promote(usize),
}
enum ColumnBuilder {
Bool(BoolColumnBuilder),
I64(PrimitiveColumnBuilder<i64>),
F64(PrimitiveColumnBuilder<f64>),
Utf8(Utf8ColumnBuilder),
DictUtf8(DictionaryColumnBuilder),
Date32(PrimitiveColumnBuilder<i32>),
TimestampMs(PrimitiveColumnBuilder<i64>),
}
impl ColumnBuilder {
fn new(dtype: DataType) -> Self {
match dtype {
DataType::Bool => Self::Bool(BoolColumnBuilder::new()),
DataType::I64 => Self::I64(PrimitiveColumnBuilder::new(0_i64)),
DataType::F64 => Self::F64(PrimitiveColumnBuilder::new(0.0_f64)),
DataType::Utf8 => Self::Utf8(Utf8ColumnBuilder::new()),
DataType::DictUtf8 => Self::DictUtf8(DictionaryColumnBuilder::new()),
DataType::Date32 => Self::Date32(PrimitiveColumnBuilder::new(0_i32)),
DataType::TimestampMs => Self::TimestampMs(PrimitiveColumnBuilder::new(0_i64)),
}
}
fn push(
&mut self,
raw: &str,
field: &Field,
options: &CsvReadOptions,
column_index: usize,
allow_promotion: bool,
) -> std::result::Result<(), BuildFailure> {
if is_null_token(raw, &options.nulls) {
if !field.nullable {
return Err(BuildFailure::Fatal(Error::Parse(format!(
"column '{}' is not nullable but encountered a null token",
field.name
))));
}
self.push_null();
return Ok(());
}
let trimmed = raw.trim();
match self {
Self::Bool(builder) => parse_bool(trimmed)
.map(|value| builder.push(value))
.ok_or_else(|| parse_failure(field, trimmed, column_index, allow_promotion)),
Self::I64(builder) => trimmed
.parse::<i64>()
.map(|value| builder.push(value))
.map_err(|_| parse_failure(field, trimmed, column_index, allow_promotion)),
Self::F64(builder) => trimmed
.parse::<f64>()
.map(|value| builder.push(value))
.map_err(|_| parse_failure(field, trimmed, column_index, allow_promotion)),
Self::Utf8(builder) => {
builder.push(raw);
Ok(())
}
Self::DictUtf8(builder) => {
builder.push(raw);
Ok(())
}
Self::Date32(builder) => {
if let Ok(value) = trimmed.parse::<i32>() {
builder.push(value);
Ok(())
} else if let Some(value) = parse_date32(trimmed) {
builder.push(value);
Ok(())
} else {
Err(parse_failure(field, trimmed, column_index, allow_promotion))
}
}
Self::TimestampMs(builder) => {
if let Ok(value) = trimmed.parse::<i64>() {
builder.push(value);
Ok(())
} else if let Some(value) = parse_timestamp_ms(trimmed, true) {
builder.push(value);
Ok(())
} else {
Err(parse_failure(field, trimmed, column_index, allow_promotion))
}
}
}
}
fn push_null(&mut self) {
match self {
Self::Bool(builder) => builder.push_null(),
Self::I64(builder) => builder.push_null(),
Self::F64(builder) => builder.push_null(),
Self::Utf8(builder) => builder.push_null(),
Self::DictUtf8(builder) => builder.push_null(),
Self::Date32(builder) => builder.push_null(),
Self::TimestampMs(builder) => builder.push_null(),
}
}
fn has_nulls(&self) -> bool {
match self {
Self::Bool(builder) => builder.has_nulls,
Self::I64(builder) => builder.has_nulls,
Self::F64(builder) => builder.has_nulls,
Self::Utf8(builder) => builder.has_nulls,
Self::DictUtf8(builder) => builder.has_nulls,
Self::Date32(builder) => builder.has_nulls,
Self::TimestampMs(builder) => builder.has_nulls,
}
}
fn finish(self) -> Result<Column> {
match self {
Self::Bool(builder) => Ok(Column::Bool(builder.finish())),
Self::I64(builder) => Ok(Column::I64(builder.finish())),
Self::F64(builder) => Ok(Column::F64(builder.finish())),
Self::Utf8(builder) => Ok(Column::Utf8(builder.finish()?)),
Self::DictUtf8(builder) => Ok(Column::DictUtf8(builder.finish()?)),
Self::Date32(builder) => Ok(Column::Date32(builder.finish())),
Self::TimestampMs(builder) => Ok(Column::TimestampMs(builder.finish())),
}
}
}
struct PrimitiveColumnBuilder<T> {
values: Vec<T>,
validity: Vec<bool>,
null_placeholder: T,
has_nulls: bool,
}
impl<T: Copy> PrimitiveColumnBuilder<T> {
fn new(null_placeholder: T) -> Self {
Self {
values: Vec::new(),
validity: Vec::new(),
null_placeholder,
has_nulls: false,
}
}
fn push(&mut self, value: T) {
self.values.push(value);
self.validity.push(true);
}
fn push_null(&mut self) {
self.values.push(self.null_placeholder);
self.validity.push(false);
self.has_nulls = true;
}
fn finish(self) -> PrimitiveCol<T> {
PrimitiveCol::new(
self.values,
validity_from_vec(self.validity, self.has_nulls),
)
}
}
struct BoolColumnBuilder {
values: Vec<bool>,
validity: Vec<bool>,
has_nulls: bool,
}
impl BoolColumnBuilder {
fn new() -> Self {
Self {
values: Vec::new(),
validity: Vec::new(),
has_nulls: false,
}
}
fn push(&mut self, value: bool) {
self.values.push(value);
self.validity.push(true);
}
fn push_null(&mut self) {
self.values.push(false);
self.validity.push(false);
self.has_nulls = true;
}
fn finish(self) -> BooleanCol {
BooleanCol::new(
Bitmap::from_bools(&self.values),
validity_from_vec(self.validity, self.has_nulls),
)
}
}
struct Utf8ColumnBuilder {
offsets: Vec<u32>,
bytes: Vec<u8>,
validity: Vec<bool>,
has_nulls: bool,
}
impl Utf8ColumnBuilder {
fn new() -> Self {
Self {
offsets: vec![0],
bytes: Vec::new(),
validity: Vec::new(),
has_nulls: false,
}
}
fn push(&mut self, value: &str) {
self.bytes.extend_from_slice(value.as_bytes());
self.offsets.push(self.bytes.len() as u32);
self.validity.push(true);
}
fn push_null(&mut self) {
self.offsets.push(self.bytes.len() as u32);
self.validity.push(false);
self.has_nulls = true;
}
fn finish(self) -> Result<Utf8Col> {
Utf8Col::new(
self.offsets,
self.bytes,
validity_from_vec(self.validity, self.has_nulls),
)
}
}
struct DictionaryColumnBuilder {
keys: Vec<u32>,
validity: Vec<bool>,
dictionary_index: HashMap<String, u32>,
dictionary_values: Vec<String>,
has_nulls: bool,
}
impl DictionaryColumnBuilder {
fn new() -> Self {
Self {
keys: Vec::new(),
validity: Vec::new(),
dictionary_index: HashMap::new(),
dictionary_values: Vec::new(),
has_nulls: false,
}
}
fn push(&mut self, value: &str) {
let key = if let Some(existing) = self.dictionary_index.get(value) {
*existing
} else {
let key = self.dictionary_values.len() as u32;
self.dictionary_values.push(value.to_string());
self.dictionary_index.insert(value.to_string(), key);
key
};
self.keys.push(key);
self.validity.push(true);
}
fn push_null(&mut self) {
self.keys.push(0);
self.validity.push(false);
self.has_nulls = true;
}
fn finish(self) -> Result<DictionaryCol<u32>> {
let values = utf8_from_strings(&self.dictionary_values)?;
Ok(DictionaryCol::new(
self.keys,
values,
validity_from_vec(self.validity, self.has_nulls),
))
}
}
fn infer_plan(path: &Path, options: &CsvReadOptions) -> Result<BuildPlan> {
if let Some(schema) = &options.schema {
validate_schema_width(path, options, schema)?;
let fallback_string_type = match options.string_encoding {
StringEncoding::Dictionary => DataType::DictUtf8,
_ => DataType::Utf8,
};
return Ok(BuildPlan {
fields: schema.fields().to_vec(),
fallback_string_types: vec![fallback_string_type; schema.len()],
});
}
if matches!(options.schema_mode, SchemaMode::Strict) {
return Err(Error::InvalidArgument(
"SchemaMode::Strict requires an explicit schema".to_string(),
));
}
let mut reader = open_reader(path, options)?;
let mut headers = if options.has_header {
reader
.headers()?
.iter()
.map(|value| value.to_string())
.collect()
} else {
Vec::new()
};
let mut infer_columns = headers
.iter()
.map(|_| InferColumn::new())
.collect::<Vec<_>>();
for (sampled_rows, record) in reader.records().enumerate() {
let record = record?;
if headers.is_empty() {
headers = generated_headers(record.len());
infer_columns = headers.iter().map(|_| InferColumn::new()).collect();
}
if record.len() != headers.len() {
return Err(Error::Parse(format!(
"record width {} does not match header width {}",
record.len(),
headers.len()
)));
}
if sampled_rows < options.infer_rows {
observe_record(&record, &mut infer_columns, options);
}
}
if headers.is_empty() {
return Ok(BuildPlan {
fields: Vec::new(),
fallback_string_types: Vec::new(),
});
}
let fields = headers
.iter()
.zip(infer_columns.iter())
.map(|(header, column)| {
Field::new(
Arc::<str>::from(header.as_str()),
column.inferred_dtype(&options.string_encoding),
)
.with_nullability(column.nullable)
})
.collect::<Vec<_>>();
let fallback_string_types = infer_columns
.iter()
.map(|column| column.string_fallback_dtype(&options.string_encoding))
.collect::<Vec<_>>();
Ok(BuildPlan {
fields,
fallback_string_types,
})
}
fn build_table(
path: &Path,
options: &CsvReadOptions,
plan: &BuildPlan,
preserve_nullability: bool,
allow_promotion: bool,
) -> std::result::Result<Table, BuildFailure> {
let mut reader = open_reader(path, options).map_err(BuildFailure::Fatal)?;
let mut builders = plan
.fields
.iter()
.map(|field| ColumnBuilder::new(field.dtype))
.collect::<Vec<_>>();
for record in reader.records() {
let record =
record.map_err(|error| BuildFailure::Fatal(Error::Parse(error.to_string())))?;
if record.len() != builders.len() {
return Err(BuildFailure::Fatal(Error::Parse(format!(
"record width {} does not match schema width {}",
record.len(),
builders.len()
))));
}
for (index, raw) in record.iter().enumerate() {
builders[index].push(raw, &plan.fields[index], options, index, allow_promotion)?;
}
}
let mut fields = Vec::with_capacity(plan.fields.len());
let mut columns = Vec::with_capacity(plan.fields.len());
for (index, builder) in builders.into_iter().enumerate() {
let mut field = plan.fields[index].clone();
field.nullable = if preserve_nullability {
field.nullable || builder.has_nulls()
} else {
builder.has_nulls()
};
fields.push(field);
columns.push(builder.finish().map_err(BuildFailure::Fatal)?);
}
Table::from_columns(Schema::new(fields).map_err(BuildFailure::Fatal)?, columns)
.map_err(BuildFailure::Fatal)
}
fn validate_schema_width(path: &Path, options: &CsvReadOptions, schema: &Schema) -> Result<()> {
let mut reader = open_reader(path, options)?;
let width = if options.has_header {
reader.headers()?.len()
} else {
match reader.records().next() {
Some(record) => record?.len(),
None => schema.len(),
}
};
if width != schema.len() {
return Err(Error::Schema(format!(
"schema has {} fields but csv width is {}",
schema.len(),
width
)));
}
Ok(())
}
fn open_reader(path: &Path, options: &CsvReadOptions) -> Result<csv::Reader<std::fs::File>> {
ReaderBuilder::new()
.delimiter(options.delimiter)
.quote(options.quote)
.has_headers(options.has_header)
.from_path(path)
.map_err(|error| Error::Io(error.to_string()))
}
fn generated_headers(width: usize) -> Vec<String> {
(1..=width).map(|index| format!("column_{index}")).collect()
}
fn observe_record(
record: &StringRecord,
infer_columns: &mut [InferColumn],
options: &CsvReadOptions,
) {
for (index, raw) in record.iter().enumerate() {
if let Some(column) = infer_columns.get_mut(index) {
column.observe(raw, options);
}
}
}
fn infer_scalar_kind(value: &str, options: &CsvReadOptions) -> InferredKind {
if parse_bool(value).is_some() {
InferredKind::Bool
} else if value.parse::<i64>().is_ok() {
InferredKind::I64
} else if value.parse::<f64>().is_ok() {
InferredKind::F64
} else if options.date_inference && parse_date32(value).is_some() {
InferredKind::Date32
} else if options.timestamp_inference && parse_timestamp_ms(value, false).is_some() {
InferredKind::TimestampMs
} else {
InferredKind::Utf8
}
}
fn parse_date32(value: &str) -> Option<i32> {
let date = NaiveDate::parse_from_str(value, DATE32_FORMAT).ok()?;
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1)?;
date.signed_duration_since(epoch).num_days().try_into().ok()
}
fn parse_timestamp_ms(value: &str, allow_date_only: bool) -> Option<i64> {
if let Ok(timestamp) = DateTime::parse_from_rfc3339(value) {
return Some(timestamp.timestamp_millis());
}
for format in TIMESTAMP_FORMATS {
if let Ok(timestamp) = NaiveDateTime::parse_from_str(value, format) {
return Some(timestamp.and_utc().timestamp_millis());
}
}
if allow_date_only {
let days = parse_date32(value)?;
return Some(i64::from(days) * 86_400_000);
}
None
}
fn format_date32(value: i32) -> Option<String> {
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1)?;
epoch
.checked_add_signed(chrono::Duration::days(i64::from(value)))
.map(|date| date.format("%Y-%m-%d").to_string())
}
fn format_timestamp_ms(value: i64) -> Option<String> {
DateTime::from_timestamp_millis(value).map(|timestamp| timestamp.naive_utc().to_string())
}
fn parse_bool(value: &str) -> Option<bool> {
match value.to_ascii_lowercase().as_str() {
"true" | "t" | "yes" | "y" => Some(true),
"false" | "f" | "no" | "n" => Some(false),
_ => None,
}
}
fn is_null_token(value: &str, null_tokens: &[String]) -> bool {
let trimmed = value.trim();
null_tokens
.iter()
.any(|token| value == token || trimmed == token)
}
fn parse_failure(
field: &Field,
value: &str,
column_index: usize,
allow_promotion: bool,
) -> BuildFailure {
if allow_promotion {
BuildFailure::Promote(column_index)
} else {
BuildFailure::Fatal(Error::Parse(format!(
"failed to parse value '{value}' for column '{}' as {}",
field.name, field.dtype
)))
}
}
fn validity_from_vec(validity: Vec<bool>, has_nulls: bool) -> Option<Bitmap> {
if has_nulls {
Some(Bitmap::from_bools(&validity))
} else {
None
}
}
fn utf8_from_strings(values: &[String]) -> Result<Utf8Col> {
let mut offsets = Vec::with_capacity(values.len() + 1);
let mut bytes = Vec::new();
offsets.push(0);
for value in values {
bytes.extend_from_slice(value.as_bytes());
offsets.push(bytes.len() as u32);
}
Utf8Col::new(offsets, bytes, None)
}
fn stringify_cell(column: &Column, row_index: u32, null_token: &str) -> String {
if column.is_null(row_index) {
return null_token.to_string();
}
match column {
Column::Bool(_) => column
.bool_value(row_index)
.map(|value| value.to_string())
.unwrap_or_else(|| null_token.to_string()),
Column::I64(_) => column
.i64_value(row_index)
.map(|value| value.to_string())
.unwrap_or_else(|| null_token.to_string()),
Column::TimestampMs(_) => column
.i64_value(row_index)
.and_then(format_timestamp_ms)
.unwrap_or_else(|| null_token.to_string()),
Column::Date32(_) => column
.i32_value(row_index)
.and_then(format_date32)
.unwrap_or_else(|| null_token.to_string()),
Column::F64(_) => column
.f64_value(row_index)
.map(|value| value.to_string())
.unwrap_or_else(|| null_token.to_string()),
Column::Utf8(_) | Column::DictUtf8(_) => column
.utf8_value(row_index)
.map(|value| value.to_string())
.unwrap_or_else(|| null_token.to_string()),
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use super::{read_path, write_path, CsvReadOptions, CsvWriteOptions, StringEncoding};
use crate::ops::{ColumnSelector, CompareOp, Literal, Predicate};
use crate::table::{Column, DataType, Field, Schema};
#[test]
fn reads_typed_columns_and_filters_rows() {
let path = write_temp_csv(
"id,active,score,name\n1,true,1.5,Alice\n2,false,,Bob\n3,true,3.25,Alicia\n",
);
let table = read_path(&path, &CsvReadOptions::default()).expect("csv should load");
let dtypes = table
.schema()
.fields()
.iter()
.map(|field| field.dtype)
.collect::<Vec<_>>();
assert_eq!(
dtypes,
vec![DataType::I64, DataType::Bool, DataType::F64, DataType::Utf8]
);
let predicate = Predicate::And(vec![
Predicate::Comparison {
column: ColumnSelector::from("active"),
op: CompareOp::Eq,
value: Some(Literal::Bool(true)),
},
Predicate::Comparison {
column: ColumnSelector::from("score"),
op: CompareOp::Gt,
value: Some(Literal::F64(2.0)),
},
]);
let filtered = table
.filter(&predicate)
.expect("filter should succeed")
.materialize()
.expect("materialize should succeed");
assert_eq!(filtered.nrows(), 1);
assert_eq!(
filtered
.column_by_name("id")
.and_then(|column| column.i64_value(0)),
Some(3)
);
assert_eq!(
filtered
.column_by_name("name")
.and_then(|column| column.utf8_value(0)),
Some("Alicia")
);
std::fs::remove_file(path).expect("temp csv should be removable");
}
#[test]
fn forces_dictionary_encoding_when_requested() {
let path = write_temp_csv("name\nalpha\nalpha\nbeta\n");
let options = CsvReadOptions {
string_encoding: StringEncoding::Dictionary,
..CsvReadOptions::default()
};
let table = read_path(&path, &options).expect("csv should load");
match table.column_by_name("name") {
Some(Column::DictUtf8(_)) => {}
other => panic!("expected dictionary column, got {other:?}"),
}
std::fs::remove_file(path).expect("temp csv should be removable");
}
#[test]
fn writes_filtered_views_back_to_csv() {
let input_path = write_temp_csv("id,name\n1,Alice\n2,Bob\n");
let output_path = temp_csv_path();
let table = read_path(&input_path, &CsvReadOptions::default()).expect("csv should load");
let filtered = table
.filter(&Predicate::Comparison {
column: ColumnSelector::from("name"),
op: CompareOp::StartsWith,
value: Some(Literal::from("A")),
})
.expect("filter should succeed");
write_path(&filtered, &output_path, &CsvWriteOptions::default())
.expect("writer should succeed");
let round_trip = read_path(&output_path, &CsvReadOptions::default()).expect("csv reload");
assert_eq!(round_trip.nrows(), 1);
assert_eq!(
round_trip
.column_by_name("name")
.and_then(|column| column.utf8_value(0)),
Some("Alice")
);
std::fs::remove_file(input_path).expect("temp input csv should be removable");
std::fs::remove_file(output_path).expect("temp output csv should be removable");
}
#[test]
fn infers_date32_and_timestamp_ms_when_enabled() {
let path = write_temp_csv(
"day,event_at\n2024-01-02,2024-01-02T03:04:05Z\n2024-01-03,2024-01-03 04:05:06\n",
);
let options = CsvReadOptions {
date_inference: true,
timestamp_inference: true,
..CsvReadOptions::default()
};
let table = read_path(&path, &options).expect("csv should load");
let dtypes = table
.schema()
.fields()
.iter()
.map(|field| field.dtype)
.collect::<Vec<_>>();
assert_eq!(dtypes, vec![DataType::Date32, DataType::TimestampMs]);
assert_eq!(
table
.column_by_name("day")
.and_then(|column| column.i32_value(0)),
super::parse_date32("2024-01-02")
);
assert_eq!(
table
.column_by_name("event_at")
.and_then(|column| column.i64_value(0)),
super::parse_timestamp_ms("2024-01-02T03:04:05Z", true)
);
std::fs::remove_file(path).expect("temp csv should be removable");
}
#[test]
fn writes_inferred_dates_and_timestamps_as_iso_strings() {
let input_path = write_temp_csv("day,event_at\n2024-01-02,2024-01-02T03:04:05Z\n");
let output_path = temp_csv_path();
let options = CsvReadOptions {
date_inference: true,
timestamp_inference: true,
..CsvReadOptions::default()
};
let table = read_path(&input_path, &options).expect("csv should load");
write_path(&table, &output_path, &CsvWriteOptions::default())
.expect("writer should succeed");
let written = fs::read_to_string(&output_path).expect("output csv should be readable");
assert!(written.contains("2024-01-02"));
assert!(written.contains("2024-01-02 03:04:05"));
std::fs::remove_file(input_path).expect("temp input csv should be removable");
std::fs::remove_file(output_path).expect("temp output csv should be removable");
}
#[test]
fn supports_documented_timestamp_formats() {
for value in [
"2024-01-02T03:04:05Z",
"2024-01-02T03:04:05+05:30",
"2024-01-02 03:04:05.123",
"2024-01-02T03:04:05.123",
"2024-01-02 03:04:05",
"2024-01-02T03:04:05",
"2024-01-02 03:04",
"2024-01-02T03:04",
] {
assert!(
super::parse_timestamp_ms(value, false).is_some(),
"expected '{value}' to be accepted"
);
}
}
#[test]
fn date_only_values_prefer_date32_inference() {
let path = write_temp_csv("day\n2024-01-02\n2024-01-03\n");
let options = CsvReadOptions {
date_inference: true,
timestamp_inference: true,
..CsvReadOptions::default()
};
let table = read_path(&path, &options).expect("csv should load");
assert_eq!(
table.schema().field(0).map(|field| field.dtype),
Some(DataType::Date32)
);
std::fs::remove_file(path).expect("temp csv should be removable");
}
#[test]
fn explicit_timestamp_columns_accept_date_only_values() {
let path = write_temp_csv("event_at\n2024-01-02\n");
let options = CsvReadOptions {
schema: Some(
Schema::new(vec![Field::new("event_at", DataType::TimestampMs)])
.expect("schema should be valid"),
),
..CsvReadOptions::default()
};
let table = read_path(&path, &options).expect("csv should load");
assert_eq!(
table
.column_by_name("event_at")
.and_then(|column| column.i64_value(0)),
super::parse_timestamp_ms("2024-01-02", true)
);
std::fs::remove_file(path).expect("temp csv should be removable");
}
fn write_temp_csv(contents: &str) -> PathBuf {
let path = temp_csv_path();
fs::write(&path, contents).expect("temp csv should be writable");
path
}
fn temp_csv_path() -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock should be after epoch")
.as_nanos();
std::env::temp_dir().join(format!("rgwml_test_{nanos}.csv"))
}
}