use std::io::BufRead;
use crate::dataframe::DataFrame;
use crate::series::Series;
use crate::VeloxxError;
use memmap2::Mmap;
use rayon::prelude::*;
use std::fs::File;
pub struct UltraFastCsvParser {
delimiter: u8,
quote: u8,
escape: u8,
infer_types: bool,
_buffer_size: usize,
}
impl Default for UltraFastCsvParser {
fn default() -> Self {
Self {
delimiter: b',',
quote: b'"',
escape: b'\\',
infer_types: true,
_buffer_size: 64 * 1024, }
}
}
impl UltraFastCsvParser {
pub fn new() -> Self {
Self::default()
}
pub fn delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
self
}
pub fn quote(mut self, quote: u8) -> Self {
self.quote = quote;
self
}
pub fn infer_types(mut self, infer: bool) -> Self {
self.infer_types = infer;
self
}
pub fn read_file(&self, path: &str) -> Result<DataFrame, VeloxxError> {
let file = File::open(path)
.map_err(|e| VeloxxError::FileIO(format!("Failed to open file: {}", e)))?;
let mmap = unsafe { Mmap::map(&file).map_err(|e| VeloxxError::FileIO(e.to_string()))? };
if mmap.is_empty() {
return Err(VeloxxError::InvalidOperation("Empty CSV file".to_string()));
}
self.parse_bytes(&mmap)
}
pub fn parse_bytes(&self, bytes: &[u8]) -> Result<DataFrame, VeloxxError> {
let header_end = bytes
.iter()
.position(|&b| b == b'\n')
.unwrap_or(bytes.len());
let header_line = std::str::from_utf8(&bytes[0..header_end]).unwrap_or("");
let headers = self.parse_csv_line(header_line)?;
let num_columns = headers.len();
let data_start = if header_end < bytes.len() {
header_end + 1
} else {
bytes.len()
};
let data_bytes = &bytes[data_start..];
if data_bytes.is_empty() {
return Ok(DataFrame::new(indexmap::IndexMap::new()));
}
let num_threads = rayon::current_num_threads();
let chunk_size = (data_bytes.len() / num_threads).max(1024 * 1024);
let mut chunk_starts = Vec::with_capacity(num_threads + 1);
chunk_starts.push(0);
let mut current_pos = 0;
for _ in 0..num_threads - 1 {
let target = current_pos + chunk_size;
if target >= data_bytes.len() {
break;
}
if let Some(newline_pos) = data_bytes[target..].iter().position(|&b| b == b'\n') {
let split_point = target + newline_pos + 1;
chunk_starts.push(split_point);
current_pos = split_point;
} else {
break;
}
}
chunk_starts.push(data_bytes.len());
let chunks: Vec<&[u8]> = chunk_starts
.windows(2)
.map(|w| &data_bytes[w[0]..w[1]])
.collect();
let parsed_chunks: Vec<Vec<Vec<String>>> = chunks
.par_iter()
.map(|chunk| {
let mut chunk_cols = vec![Vec::new(); num_columns];
let s = std::str::from_utf8(chunk).unwrap_or(""); for line in s.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(fields) = self.parse_csv_line(line) {
if fields.len() == num_columns {
for (i, field) in fields.into_iter().enumerate() {
chunk_cols[i].push(field);
}
}
}
}
chunk_cols
})
.collect();
let mut final_columns_data = vec![Vec::new(); num_columns];
for chunk_res in parsed_chunks {
for (col_idx, col_data) in chunk_res.into_iter().enumerate() {
final_columns_data[col_idx].extend(col_data);
}
}
let mut dataframe_columns = indexmap::IndexMap::new();
for (col_idx, column_name) in headers.iter().enumerate() {
let raw_data = &final_columns_data[col_idx];
if self.infer_types {
let series = self.infer_and_convert_column(column_name, raw_data)?;
dataframe_columns.insert(column_name.clone(), series);
} else {
let string_data: Vec<Option<String>> =
raw_data.iter().map(|s| Some(s.clone())).collect();
let series = Series::new_string(column_name, string_data);
dataframe_columns.insert(column_name.clone(), series);
}
}
Ok(DataFrame::new(dataframe_columns))
}
pub fn read_from_reader<R: BufRead>(&self, reader: R) -> Result<DataFrame, VeloxxError> {
let mut lines = reader.lines();
let header_line = lines
.next()
.ok_or_else(|| VeloxxError::InvalidOperation("Empty CSV file".to_string()))?
.map_err(|e| VeloxxError::FileIO(format!("Failed to read header: {}", e)))?;
let headers = self.parse_csv_line(&header_line)?;
let num_columns = headers.len();
let mut columns_data: Vec<Vec<String>> = vec![Vec::new(); num_columns];
let mut row_count = 0;
for line_result in lines {
let line = line_result
.map_err(|e| VeloxxError::FileIO(format!("Failed to read line: {}", e)))?;
if line.trim().is_empty() {
continue;
}
let fields = self.parse_csv_line(&line)?;
if fields.len() != num_columns {
return Err(VeloxxError::InvalidOperation(format!(
"Row {} has {} fields, expected {}",
row_count + 1,
fields.len(),
num_columns
)));
}
for (col_idx, field) in fields.into_iter().enumerate() {
columns_data[col_idx].push(field);
}
row_count += 1;
}
let mut dataframe_columns = indexmap::IndexMap::new();
for (col_idx, column_name) in headers.iter().enumerate() {
let raw_data = &columns_data[col_idx];
if self.infer_types {
let series = self.infer_and_convert_column(column_name, raw_data)?;
dataframe_columns.insert(column_name.clone(), series);
} else {
let string_data: Vec<Option<String>> = raw_data
.iter()
.map(|s| if s.is_empty() { None } else { Some(s.clone()) })
.collect();
let series = Series::new_string(column_name, string_data);
dataframe_columns.insert(column_name.clone(), series);
}
}
Ok(DataFrame::new(dataframe_columns))
}
fn parse_csv_line(&self, line: &str) -> Result<Vec<String>, VeloxxError> {
let mut fields = Vec::new();
let mut current_field = String::new();
let mut in_quotes = false;
let mut escaped = false;
let bytes = line.as_bytes();
for &byte in bytes {
if escaped {
current_field.push(byte as char);
escaped = false;
} else if byte == self.escape {
escaped = true;
} else if byte == self.quote {
in_quotes = !in_quotes;
} else if byte == self.delimiter && !in_quotes {
fields.push(current_field.trim().to_string());
current_field.clear();
} else {
current_field.push(byte as char);
}
}
fields.push(current_field.trim().to_string());
Ok(fields)
}
fn infer_and_convert_column(
&self,
name: &str,
raw_data: &[String],
) -> Result<Series, VeloxxError> {
if raw_data.is_empty() {
return Ok(Series::new_string(name, vec![]));
}
let non_empty_samples: Vec<_> = raw_data
.iter()
.filter(|s| !s.is_empty())
.take(100) .collect();
if non_empty_samples.is_empty() {
return Ok(Series::new_string(name, vec![]));
}
let mut all_i32 = true;
let mut i32_values = Vec::with_capacity(raw_data.len());
for value_str in raw_data {
if value_str.is_empty() {
i32_values.push(None);
} else {
match value_str.parse::<i32>() {
Ok(val) => i32_values.push(Some(val)),
Err(_) => {
all_i32 = false;
break;
}
}
}
}
if all_i32 {
return Ok(Series::new_i32(name, i32_values));
}
let mut all_f64 = true;
let mut f64_values = Vec::with_capacity(raw_data.len());
for value_str in raw_data {
if value_str.is_empty() {
f64_values.push(None);
} else {
match value_str.parse::<f64>() {
Ok(val) => f64_values.push(Some(val)),
Err(_) => {
all_f64 = false;
break;
}
}
}
}
if all_f64 {
return Ok(Series::new_f64(name, f64_values));
}
let mut all_bool = true;
let mut bool_values = Vec::with_capacity(raw_data.len());
for value_str in raw_data {
if value_str.is_empty() {
bool_values.push(None);
} else {
let lower = value_str.to_lowercase();
match lower.as_str() {
"true" | "t" | "yes" | "y" | "1" => bool_values.push(Some(true)),
"false" | "f" | "no" | "n" | "0" => bool_values.push(Some(false)),
_ => {
all_bool = false;
break;
}
}
}
}
if all_bool {
return Ok(Series::new_bool(name, bool_values));
}
let string_values: Vec<Option<String>> = raw_data
.iter()
.map(|s| if s.is_empty() { None } else { Some(s.clone()) })
.collect();
Ok(Series::new_string(name, string_values))
}
}
impl UltraFastCsvParser {
pub fn quick_read(path: &str) -> Result<DataFrame, VeloxxError> {
Self::new().read_file(path)
}
pub fn parse_string(content: &str) -> Result<DataFrame, VeloxxError> {
use std::io::Cursor;
let cursor = Cursor::new(content.as_bytes());
Self::new().read_from_reader(cursor)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
fn create_test_csv() -> NamedTempFile {
let mut file = NamedTempFile::new().unwrap();
writeln!(file, "id,name,value,score,active").unwrap();
writeln!(file, "1,Alice,10.5,95.5,true").unwrap();
writeln!(file, "2,Bob,20.0,87.2,false").unwrap();
writeln!(file, "3,Charlie,15.7,92.1,true").unwrap();
file.flush().unwrap();
file
}
#[test]
fn test_basic_csv_parsing() {
let file = create_test_csv();
let parser = UltraFastCsvParser::new();
let df = parser.read_file(file.path().to_str().unwrap()).unwrap();
assert_eq!(df.row_count(), 3);
assert_eq!(df.column_count(), 5);
let column_names = df.column_names();
assert!(column_names.contains(&"id".to_string()));
assert!(column_names.contains(&"name".to_string()));
assert!(column_names.contains(&"value".to_string()));
}
#[test]
fn test_type_inference() {
let file = create_test_csv();
let parser = UltraFastCsvParser::new().infer_types(true);
let df = parser.read_file(file.path().to_str().unwrap()).unwrap();
if let Some(Series::I32(_, _, _)) = df.get_column("id") {
} else {
panic!("ID column should be inferred as i32");
}
if let Some(Series::F64(_, _, _)) = df.get_column("value") {
} else {
panic!("Value column should be inferred as f64");
}
if let Some(Series::Bool(_, _, _)) = df.get_column("active") {
} else {
panic!("Active column should be inferred as bool");
}
}
#[test]
fn test_csv_line_parsing() {
let parser = UltraFastCsvParser::new();
let simple_line = "a,b,c";
let fields = parser.parse_csv_line(simple_line).unwrap();
assert_eq!(fields, vec!["a", "b", "c"]);
let quoted_line = r#""hello, world",test,"with ""quotes""" "#;
let fields = parser.parse_csv_line(quoted_line).unwrap();
assert_eq!(fields.len(), 3);
}
#[test]
fn test_string_parsing() {
let csv_content = "x,y\n1,2\n3,4\n";
let df = UltraFastCsvParser::parse_string(csv_content).unwrap();
assert_eq!(df.row_count(), 2);
assert_eq!(df.column_count(), 2);
}
}