use std::path::Path;
use crate::core::errors::DataProfilerError;
use crate::engines::streaming::IncrementalProfiler;
use crate::parsers::csv::CsvParserConfig;
use crate::types::{MetricPack, ProfileReport, QualityDimension};
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum InternalEngineType {
Arrow,
Incremental,
}
pub(crate) struct AdaptiveProfiler {
quality_dimensions: Option<Vec<QualityDimension>>,
metric_packs: Option<Vec<MetricPack>>,
csv_config: Option<CsvParserConfig>,
locale: Option<String>,
}
impl AdaptiveProfiler {
pub fn new() -> Self {
Self {
quality_dimensions: None,
metric_packs: None,
csv_config: None,
locale: None,
}
}
pub fn quality_dimensions(mut self, dims: Vec<QualityDimension>) -> Self {
self.quality_dimensions = Some(dims);
self
}
pub fn metric_packs(mut self, packs: Vec<MetricPack>) -> Self {
self.metric_packs = Some(packs);
self
}
pub fn csv_config(mut self, config: CsvParserConfig) -> Self {
self.csv_config = Some(config);
self
}
pub fn locale(mut self, locale: String) -> Self {
self.locale = Some(locale);
self
}
pub fn analyze_file(&self, file_path: &Path) -> Result<ProfileReport, DataProfilerError> {
if is_parquet(file_path) {
return crate::parsers::parquet::analyze_parquet_with_quality_dims(
file_path,
self.quality_dimensions.as_deref(),
);
}
let engine = self.select_engine(file_path);
log::info!("Engine selected: {:?}", engine);
let result = self.try_engine(&engine, file_path);
match result {
Ok(report) => Ok(report),
Err(primary_err) => {
let fallback = match engine {
InternalEngineType::Arrow => InternalEngineType::Incremental,
InternalEngineType::Incremental => InternalEngineType::Arrow,
};
log::warn!("Fallback: {:?} → {:?} — {}", engine, fallback, primary_err);
self.try_engine(&fallback, file_path)
.map_err(|fallback_err| DataProfilerError::AllEnginesFailed {
message: format!(
"All engines failed. Primary ({:?}): {}. Fallback ({:?}): {}.",
engine, primary_err, fallback, fallback_err
),
})
}
}
}
fn select_engine(&self, file_path: &Path) -> InternalEngineType {
use std::fs::File;
use std::io::{BufRead, BufReader};
let Ok(file) = File::open(file_path) else {
return InternalEngineType::Incremental;
};
let reader = BufReader::new(file);
let mut lines = reader.lines();
let delimiter = self
.csv_config
.as_ref()
.and_then(|c| c.delimiter)
.unwrap_or(b',') as char;
let header = match lines.next() {
Some(Ok(line)) => line,
_ => return InternalEngineType::Incremental,
};
let num_columns = header.split(delimiter).count();
if num_columns < 20 {
return InternalEngineType::Incremental;
}
let mut numeric_count = 0usize;
let mut total_fields = 0usize;
for line in lines.take(10) {
let Ok(line) = line else { break };
for val in line.split(delimiter) {
total_fields += 1;
if val.trim().parse::<f64>().is_ok() {
numeric_count += 1;
}
}
}
let numeric_ratio = if total_fields > 0 {
numeric_count as f64 / total_fields as f64
} else {
0.0
};
if numeric_ratio > 0.5 {
InternalEngineType::Arrow
} else {
InternalEngineType::Incremental
}
}
fn try_engine(
&self,
engine_type: &InternalEngineType,
file_path: &Path,
) -> Result<ProfileReport, DataProfilerError> {
match engine_type {
InternalEngineType::Arrow => {
use crate::engines::columnar::ArrowProfiler;
let mut profiler = ArrowProfiler::new();
if let Some(ref dims) = self.quality_dimensions {
profiler = profiler.quality_dimensions(dims.clone());
}
if let Some(ref packs) = self.metric_packs {
profiler = profiler.metric_packs(packs.clone());
}
if let Some(ref config) = self.csv_config {
profiler = profiler.csv_config(config.clone());
}
if let Some(ref l) = self.locale {
profiler = profiler.locale(l.clone());
}
profiler.analyze_csv_file(file_path)
}
InternalEngineType::Incremental => {
let mut profiler = IncrementalProfiler::new();
if let Some(ref dims) = self.quality_dimensions {
profiler = profiler.quality_dimensions(dims.clone());
}
if let Some(ref packs) = self.metric_packs {
profiler = profiler.metric_packs(packs.clone());
}
if let Some(ref config) = self.csv_config {
profiler = profiler.csv_config(config.clone());
}
if let Some(ref l) = self.locale {
profiler = profiler.locale(l.clone());
}
profiler.analyze_file(file_path)
}
}
}
}
fn is_parquet(file_path: &Path) -> bool {
file_path
.extension()
.map(|ext| ext.eq_ignore_ascii_case("parquet"))
.unwrap_or(false)
|| crate::parsers::parquet::is_parquet_file(file_path)
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_adaptive_profiler_basic() -> Result<()> {
let profiler = AdaptiveProfiler::new();
let mut temp_file = NamedTempFile::new()?;
writeln!(temp_file, "name,age,salary")?;
writeln!(temp_file, "Alice,25,50000")?;
writeln!(temp_file, "Bob,30,60000")?;
temp_file.flush()?;
let report = profiler.analyze_file(temp_file.path())?;
assert_eq!(report.column_profiles.len(), 3);
assert_eq!(report.execution.rows_processed, 2);
Ok(())
}
#[test]
fn test_fallback_mechanism() -> Result<()> {
let profiler = AdaptiveProfiler::new();
let mut temp_file = NamedTempFile::new()?;
writeln!(temp_file, "name,data")?;
writeln!(temp_file, "test,\"complex,data\"with\"quotes\"")?;
temp_file.flush()?;
let report = profiler.analyze_file(temp_file.path())?;
assert_eq!(report.column_profiles.len(), 2);
Ok(())
}
#[test]
fn test_select_engine_few_columns() {
let profiler = AdaptiveProfiler::new();
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "a,b,c").unwrap();
writeln!(temp_file, "1,2,3").unwrap();
temp_file.flush().unwrap();
let engine = profiler.select_engine(temp_file.path());
assert_eq!(engine, InternalEngineType::Incremental);
}
}