use csv::ReaderBuilder;
use std::io::Read;
use std::path::Path;
use crate::core::errors::DataProfilerError;
use crate::core::profile_builder;
use crate::core::report_assembler::ReportAssembler;
use crate::core::streaming_stats::StreamingColumnCollection;
use crate::types::{
ColumnProfile, DataSource, ExecutionMetadata, FileFormat, ProfileReport, QualityDimension,
};
#[derive(Debug, Clone)]
pub struct CsvParserConfig {
pub flexible: bool,
pub delimiter: Option<u8>,
pub quote_char: u8,
pub trim_whitespace: bool,
pub has_header: bool,
pub max_rows: Option<usize>,
}
impl Default for CsvParserConfig {
fn default() -> Self {
Self {
flexible: true,
delimiter: None,
quote_char: b'"',
trim_whitespace: false,
has_header: true,
max_rows: None,
}
}
}
impl CsvParserConfig {
pub fn strict() -> Self {
Self {
flexible: false,
..Default::default()
}
}
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = Some(delimiter);
self
}
pub fn has_header(mut self, yes: bool) -> Self {
self.has_header = yes;
self
}
pub fn max_rows(mut self, max: Option<usize>) -> Self {
self.max_rows = max;
self
}
}
fn count_fields(line: &str, del: char) -> usize {
let mut count = 1;
let mut in_quotes = false;
let mut chars = line.chars().peekable();
while let Some(c) = chars.next() {
if c == '"' {
if chars.peek() == Some(&'"') {
chars.next();
} else {
in_quotes = !in_quotes;
}
} else if c == del && !in_quotes {
count += 1;
}
}
count
}
pub fn detect_delimiter<R: Read>(reader: R) -> std::io::Result<u8> {
let mut preamble = Vec::new();
let mut take = reader.take(4096);
take.read_to_end(&mut preamble)?;
let preamble_str = String::from_utf8_lossy(&preamble);
let lines: Vec<String> = preamble_str.lines().map(|s| s.to_string()).collect();
let delimiters = [b',', b';', b'\t', b'|'];
let mut best_del = b',';
let mut max_consistency = 0;
let mut max_fields = 0;
for &d in &delimiters {
let del_char = d as char;
let mut counts = std::collections::HashMap::new();
let mut total_fields = 0;
let sample_lines = std::cmp::min(lines.len(), 5);
for line in lines.iter().take(sample_lines) {
let c = count_fields(line, del_char);
*counts.entry(c).or_insert(0) += 1;
total_fields += c;
}
let consistency = counts.values().max().copied().unwrap_or(0);
let avg_fields = if sample_lines > 0 {
total_fields / sample_lines
} else {
0
};
let should_update = if avg_fields > 1 && max_fields <= 1 {
true
} else if avg_fields <= 1 && max_fields > 1 {
false
} else {
consistency > max_consistency
|| (consistency == max_consistency && avg_fields > max_fields)
};
if should_update {
max_consistency = consistency;
max_fields = avg_fields;
best_del = d;
}
}
Ok(best_del)
}
pub fn detect_delimiter_from_path(path: &Path) -> std::io::Result<u8> {
let file = std::fs::File::open(path)?;
detect_delimiter(std::io::BufReader::new(file))
}
pub fn analyze_csv_from_reader<R: Read>(
reader: R,
config: &CsvParserConfig,
) -> Result<
(
Vec<ColumnProfile>,
StreamingColumnCollection,
usize,
Vec<String>,
),
DataProfilerError,
> {
let mut csv_builder = ReaderBuilder::new();
csv_builder.has_headers(config.has_header);
csv_builder.flexible(config.flexible);
csv_builder.quote(config.quote_char);
if config.trim_whitespace {
csv_builder.trim(csv::Trim::All);
}
let mut actual_delimiter = config.delimiter;
let boxed_reader: Box<dyn std::io::Read + '_> = if config.delimiter.is_none() {
let mut preamble = Vec::new();
let mut take = reader.take(4096);
take.read_to_end(&mut preamble)?;
let detected = detect_delimiter(std::io::Cursor::new(&preamble)).unwrap_or(b',');
actual_delimiter = Some(detected);
Box::new(std::io::Cursor::new(preamble).chain(take.into_inner()))
} else {
Box::new(reader)
};
if let Some(delim) = actual_delimiter {
csv_builder.delimiter(delim);
}
let mut csv_reader = csv_builder.from_reader(boxed_reader);
let header_names: Vec<String> = if config.has_header {
let headers = csv_reader.headers()?;
headers.iter().map(|h| h.to_string()).collect()
} else {
let headers = csv_reader.headers()?;
(0..headers.len()).map(|i| format!("column_{i}")).collect()
};
let mut column_stats = StreamingColumnCollection::new();
let mut rows_read = 0;
for result in csv_reader.records() {
if let Some(max) = config.max_rows
&& rows_read >= max
{
break;
}
let record = result?;
let mut values: Vec<String> = record.iter().map(|s| s.to_string()).collect();
let header_len = header_names.len();
if values.len() < header_len {
values.resize(header_len, String::new());
} else if values.len() > header_len {
values.truncate(header_len);
}
column_stats.process_record(&header_names, values);
rows_read += 1;
}
let profiles = profile_builder::profiles_from_streaming(&column_stats, false, false, None);
Ok((profiles, column_stats, rows_read, header_names))
}
pub fn analyze_csv_file(
file_path: &Path,
config: &CsvParserConfig,
) -> Result<ProfileReport, DataProfilerError> {
analyze_csv_file_with_dimensions(file_path, config, None)
}
pub fn analyze_csv_file_with_dimensions(
file_path: &Path,
config: &CsvParserConfig,
quality_dimensions: Option<&[QualityDimension]>,
) -> Result<ProfileReport, DataProfilerError> {
let metadata = std::fs::metadata(file_path).map_err(|e| map_io_error(file_path, e))?;
let start = std::time::Instant::now();
let file = std::fs::File::open(file_path).map_err(|e| map_io_error(file_path, e))?;
let buf_reader = std::io::BufReader::new(file);
let (column_profiles, column_stats, rows_read, header_names) =
analyze_csv_from_reader(buf_reader, config)?;
let file_source = DataSource::File {
path: file_path.display().to_string(),
format: FileFormat::Csv,
size_bytes: metadata.len(),
modified_at: None,
parquet_metadata: None,
};
if column_profiles.is_empty() && rows_read == 0 {
return Ok(ReportAssembler::new(
file_source,
ExecutionMetadata::new(0, header_names.len(), start.elapsed().as_millis()),
)
.skip_quality()
.build());
}
let sample_columns = profile_builder::quality_check_samples(&column_stats);
let scan_time_ms = start.elapsed().as_millis();
let num_columns = column_profiles.len();
let mut assembler = ReportAssembler::new(
file_source,
ExecutionMetadata::new(rows_read, num_columns, scan_time_ms),
)
.columns(column_profiles)
.with_quality_data(sample_columns);
if let Some(dims) = quality_dimensions {
assembler = assembler.with_requested_dimensions(dims.to_vec());
}
Ok(assembler.build())
}
fn map_io_error(file_path: &Path, e: std::io::Error) -> DataProfilerError {
if e.kind() == std::io::ErrorKind::NotFound {
DataProfilerError::FileNotFound {
path: file_path.display().to_string(),
}
} else {
DataProfilerError::from(e)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::DataType;
use std::io::{Cursor, Write};
use tempfile::NamedTempFile;
fn write_csv(content: &str) -> NamedTempFile {
let mut f = NamedTempFile::new().unwrap();
write!(f, "{}", content).unwrap();
f.flush().unwrap();
f
}
#[test]
fn test_analyze_csv_from_reader_basic() {
let data = b"name,age\nAlice,25\nBob,30\n";
let cursor = Cursor::new(data.as_ref());
let config = CsvParserConfig::default();
let (profiles, _stats, rows, _) = analyze_csv_from_reader(cursor, &config).unwrap();
assert_eq!(rows, 2);
assert_eq!(profiles.len(), 2);
let age = profiles.iter().find(|p| p.name == "age").unwrap();
assert_eq!(age.total_count, 2);
assert_eq!(age.data_type, DataType::Integer);
}
#[test]
fn test_analyze_csv_from_reader_with_max_rows() {
let data = b"val\n1\n2\n3\n4\n5\n";
let cursor = Cursor::new(data.as_ref());
let config = CsvParserConfig::default().max_rows(Some(3));
let (_profiles, _stats, rows, _) = analyze_csv_from_reader(cursor, &config).unwrap();
assert_eq!(rows, 3);
}
#[test]
fn test_analyze_csv_from_reader_strict_rejects_ragged() {
let data = b"a,b\n1,2\n3,4,5\n";
let cursor = Cursor::new(data.as_ref());
let config = CsvParserConfig::strict();
assert!(analyze_csv_from_reader(cursor, &config).is_err());
}
#[test]
fn test_analyze_csv_from_reader_flexible_handles_ragged() {
let data = b"a,b\n1,2\n3,4,5\n6\n";
let cursor = Cursor::new(data.as_ref());
let config = CsvParserConfig::default();
let (profiles, _stats, rows, _) = analyze_csv_from_reader(cursor, &config).unwrap();
assert_eq!(rows, 3);
assert!(!profiles.is_empty());
}
#[test]
fn test_analyze_csv_from_reader_custom_delimiter() {
let data = b"a;b\n1;2\n3;4\n";
let cursor = Cursor::new(data.as_ref());
let config = CsvParserConfig::default().with_delimiter(b';');
let (profiles, _stats, rows, _) = analyze_csv_from_reader(cursor, &config).unwrap();
assert_eq!(rows, 2);
assert_eq!(profiles.len(), 2);
}
#[test]
fn test_analyze_csv_file_returns_quality_report() {
let csv = write_csv("x,y\n1,a\n2,b\n3,c\n");
let config = CsvParserConfig::default();
let report = analyze_csv_file(csv.path(), &config).unwrap();
assert_eq!(report.column_profiles.len(), 2);
assert_eq!(report.execution.rows_processed, 3);
assert!(report.quality_score().unwrap() >= 0.0);
assert!(report.quality_score().unwrap() <= 100.0);
}
#[test]
fn test_csv_auto_detects_delimiter_from_reader() {
let data = b"name;age\nAlice;25\nBob;30\n";
let cursor = Cursor::new(data.as_ref());
let config = CsvParserConfig::default();
let (profiles, _stats, rows, _) = analyze_csv_from_reader(cursor, &config).unwrap();
assert_eq!(rows, 2);
assert_eq!(profiles.len(), 2);
let age = profiles.iter().find(|p| p.name == "age").unwrap();
assert_eq!(age.total_count, 2);
}
#[test]
fn test_analyze_csv_basic() {
let csv = write_csv("name,age\nAlice,25\nBob,30\n");
let config = CsvParserConfig::default();
let report = analyze_csv_file(csv.path(), &config).unwrap();
let profiles = &report.column_profiles;
assert_eq!(profiles.len(), 2);
let names: Vec<&str> = profiles.iter().map(|p| p.name.as_str()).collect();
assert!(names.contains(&"name"));
assert!(names.contains(&"age"));
let age = profiles.iter().find(|p| p.name == "age").unwrap();
assert_eq!(age.total_count, 2);
assert_eq!(age.null_count, 0);
}
#[test]
fn test_analyze_csv_with_nulls() {
let csv = write_csv("name,age,email\nAlice,25,a@b.com\nBob,,\nCharlie,30,c@d.com\n");
let config = CsvParserConfig::default();
let report = analyze_csv_file(csv.path(), &config).unwrap();
let profiles = &report.column_profiles;
let age = profiles.iter().find(|p| p.name == "age").unwrap();
assert_eq!(age.total_count, 3);
assert_eq!(age.null_count, 1);
let email = profiles.iter().find(|p| p.name == "email").unwrap();
assert_eq!(email.null_count, 1);
}
#[test]
fn test_analyze_csv_file_returns_quality_report_legacy() {
let csv = write_csv("x,y\n1,a\n2,b\n3,c\n");
let config = CsvParserConfig::default();
let report = analyze_csv_file(csv.path(), &config).unwrap();
assert_eq!(report.column_profiles.len(), 2);
assert_eq!(report.execution.rows_processed, 3);
assert_eq!(report.execution.columns_detected, 2);
assert!(report.quality_score().unwrap() >= 0.0);
assert!(report.quality_score().unwrap() <= 100.0);
}
#[test]
fn test_analyze_csv_file_empty_file() {
let csv = write_csv("name,age\n");
let config = CsvParserConfig::default();
let report = analyze_csv_file(csv.path(), &config).unwrap();
assert_eq!(report.column_profiles.len(), 0);
assert_eq!(report.execution.rows_processed, 0);
}
#[test]
fn test_analyze_csv_with_max_rows_small_file() {
let csv = write_csv("val\n1\n2\n3\n4\n5\n");
let config = CsvParserConfig::default();
let report = analyze_csv_file(csv.path(), &config).unwrap();
assert_eq!(report.execution.rows_processed, 5);
assert_eq!(report.execution.columns_detected, 1);
assert!((report.execution.sampling_ratio.unwrap_or(1.0) - 1.0).abs() < 0.01);
}
#[test]
fn test_analyze_csv_numeric_types_detected() {
let csv = write_csv("int_col,float_col,str_col\n1,1.5,hello\n2,2.5,world\n3,3.5,foo\n");
let config = CsvParserConfig::default();
let report = analyze_csv_file(csv.path(), &config).unwrap();
let profiles = &report.column_profiles;
let int_col = profiles.iter().find(|p| p.name == "int_col").unwrap();
assert!(
matches!(int_col.data_type, DataType::Integer),
"Expected Integer, got {:?}",
int_col.data_type
);
let float_col = profiles.iter().find(|p| p.name == "float_col").unwrap();
assert!(
matches!(float_col.data_type, DataType::Float),
"Expected Float, got {:?}",
float_col.data_type
);
let str_col = profiles.iter().find(|p| p.name == "str_col").unwrap();
assert!(
matches!(str_col.data_type, DataType::String),
"Expected String, got {:?}",
str_col.data_type
);
}
#[test]
fn test_analyze_csv_strict_rejects_ragged_rows() {
let csv = write_csv("a,b\n1,2\n3,4,5\n");
let config = CsvParserConfig::strict();
assert!(analyze_csv_file(csv.path(), &config).is_err());
}
#[test]
fn test_analyze_csv_file_handles_ragged_rows() {
let csv = write_csv("a,b\n1,2\n3,4,5\n6\n");
let config = CsvParserConfig::default();
let report = analyze_csv_file(csv.path(), &config).unwrap();
assert!(!report.column_profiles.is_empty());
}
#[test]
fn test_analyze_csv_with_quality_metrics() {
let csv = write_csv(
"name,age\n\
Alice,25\nBob,\nCharlie,30\nDave,\nEve,28\n\
Frank,\nGrace,35\nHeidi,40\nIvan,\nJudy,22\n",
);
let config = CsvParserConfig::default();
let report = analyze_csv_file(csv.path(), &config).unwrap();
let age = report
.column_profiles
.iter()
.find(|p| p.name == "age")
.unwrap();
assert_eq!(age.null_count, 4);
assert_eq!(age.total_count, 10);
assert!(report.quality_score().unwrap() >= 0.0);
}
#[test]
fn test_detect_delimiter_comma() {
let data = b"name,age,city\nAlice,25,NYC\nBob,30,London\n";
assert_eq!(detect_delimiter(Cursor::new(data.as_ref())).unwrap(), b',');
}
#[test]
fn test_detect_delimiter_semicolon() {
let data = b"name;age;city\nAlice;25;NYC\nBob;30;London\n";
assert_eq!(detect_delimiter(Cursor::new(data.as_ref())).unwrap(), b';');
}
#[test]
fn test_detect_delimiter_pipe() {
let data = b"name|age|city\nAlice|25|NYC\nBob|30|London\n";
assert_eq!(detect_delimiter(Cursor::new(data.as_ref())).unwrap(), b'|');
}
#[test]
fn test_detect_delimiter_tab() {
let data = b"name\tage\tcity\nAlice\t25\tNYC\nBob\t30\tLondon\n";
assert_eq!(detect_delimiter(Cursor::new(data.as_ref())).unwrap(), b'\t');
}
#[test]
fn test_detect_delimiter_from_path_semicolon() {
let csv = write_csv("id;name;salary\n1;Alice;50000\n2;Bob;60000\n");
assert_eq!(detect_delimiter_from_path(csv.path()).unwrap(), b';');
}
#[test]
fn test_detect_delimiter_quoted_fields() {
let data = b"name;desc;val\n\"hello;world\";foo;1\nbar;baz;2\n";
assert_eq!(detect_delimiter(Cursor::new(data.as_ref())).unwrap(), b';');
}
}