use anyhow::Result;
use std::path::Path;
use std::sync::Arc;
use dataprof::{
ChunkSize, EngineType, MetricPack, Profiler, ProgressEvent, ProgressSink,
core::{DataprofConfig, sampling::SamplingStrategy},
types::ProfileReport,
};
#[derive(Default)]
pub struct AnalysisOptions {
pub progress: bool,
pub chunk_size: Option<usize>,
pub config: Option<std::path::PathBuf>,
pub sample: Option<usize>,
pub verbosity: Option<u8>,
pub metric_packs: Option<Vec<MetricPack>>,
pub locale: Option<String>,
}
pub struct ProfilerBuilder {
options: AnalysisOptions,
#[allow(dead_code)]
config: DataprofConfig,
}
impl ProfilerBuilder {
pub fn new(options: AnalysisOptions, config: DataprofConfig) -> Self {
Self { options, config }
}
pub fn build_streaming(&self, _file_path: &Path) -> Result<Profiler> {
let mut profiler = Profiler::new().engine(EngineType::Incremental);
let chunk_size = if let Some(size) = self.options.chunk_size {
ChunkSize::Fixed(size)
} else {
ChunkSize::Adaptive
};
profiler = profiler.chunk_size(chunk_size);
if let Some(sample_size) = self.options.sample {
profiler = profiler.sampling(SamplingStrategy::Random { size: sample_size });
}
if let Some(ref packs) = self.options.metric_packs {
profiler = profiler.metric_packs(packs.clone());
}
if let Some(ref locale) = self.options.locale {
profiler = profiler.locale(locale);
}
if self.options.progress {
let sink = ProgressSink::Callback(Arc::new(|event: ProgressEvent| {
if let ProgressEvent::ChunkProcessed {
rows_processed,
percentage,
processing_speed,
..
} = event
{
print!(
"\rProcessing: {:.1}% ({} rows, {:.1} rows/sec)",
percentage.unwrap_or(0.0),
rows_processed,
processing_speed
);
let _ = std::io::Write::flush(&mut std::io::stdout());
}
}));
profiler = profiler.progress_sink(sink);
}
Ok(profiler)
}
}
pub fn analyze_file_with_options(
file_path: &Path,
options: AnalysisOptions,
) -> Result<ProfileReport> {
let config = if let Some(config_path) = &options.config {
match DataprofConfig::load_from_file(config_path) {
Ok(cfg) => {
log::info!("Loaded configuration from: {}", config_path.display());
cfg
}
Err(e) => {
log::warn!(
"Failed to load config from {}: {}. Using defaults.",
config_path.display(),
e
);
DataprofConfig::default()
}
}
} else {
DataprofConfig::load_with_discovery()
};
let config = if let Some(verbosity) = options.verbosity {
let mut updated_config = config;
updated_config.output.verbosity = verbosity;
updated_config
} else {
config
};
let is_parquet = super::commands::is_parquet_file(file_path);
if super::commands::is_json_file(file_path) {
Ok(dataprof::parsers::json::analyze_json_file(
file_path,
&dataprof::parsers::json::JsonParserConfig::default(),
)?)
} else if is_parquet {
Ok(dataprof::analyze_parquet_with_quality(file_path)?)
} else {
let builder = ProfilerBuilder::new(options, config.clone());
let verbosity = config.output.verbosity;
match builder.build_streaming(file_path) {
Ok(profiler) => {
match profiler.analyze_file(file_path) {
Ok(report) => {
if builder.options.progress {
println!();
}
Ok(report)
}
Err(e) => {
if verbosity >= 2 {
eprintln!("Streaming analysis failed: {}. Trying robust parser...", e);
}
Ok(dataprof::parsers::csv::analyze_csv_file(
file_path,
&dataprof::parsers::csv::CsvParserConfig::default(),
)?)
}
}
}
Err(e) => {
if verbosity >= 2 {
eprintln!(
"Profiler initialization failed: {}. Trying robust parser...",
e
);
}
Ok(dataprof::parsers::csv::analyze_csv_file(
file_path,
&dataprof::parsers::csv::CsvParserConfig::default(),
)?)
}
}
}
}