pub mod partial;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use crate::core::errors::DataProfilerError;
use crate::core::progress::{ProgressEvent, ProgressSink};
use crate::core::sampling::{ChunkSize, SamplingStrategy};
use crate::core::stop_condition::StopCondition;
#[cfg(feature = "database")]
use crate::database::DatabaseConfig;
use crate::engines::adaptive::AdaptiveProfiler;
use crate::types::{
DataSource, FileFormat, ProfileReport, QualityDimension, RowCountEstimate, SchemaResult,
};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum EngineType {
#[default]
Auto,
Incremental,
Columnar,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ProfilerConfig {
pub engine: EngineType,
pub chunk_size: ChunkSize,
pub sampling: SamplingStrategy,
pub memory_limit_mb: Option<usize>,
pub format_override: Option<FileFormat>,
pub stop_condition: StopCondition,
pub progress_interval: Duration,
pub csv_delimiter: Option<u8>,
pub csv_flexible: Option<bool>,
pub quality_dimensions: Option<Vec<QualityDimension>>,
#[cfg(feature = "database")]
pub database_config: Option<DatabaseConfig>,
}
impl Default for ProfilerConfig {
fn default() -> Self {
Self {
engine: EngineType::Auto,
chunk_size: ChunkSize::Adaptive,
sampling: SamplingStrategy::None,
memory_limit_mb: None,
format_override: None,
stop_condition: StopCondition::Never,
progress_interval: Duration::from_millis(500),
csv_delimiter: None,
csv_flexible: None,
quality_dimensions: None,
#[cfg(feature = "database")]
database_config: None,
}
}
}
pub struct Profiler {
config: ProfilerConfig,
progress_sink: ProgressSink,
}
impl Profiler {
pub fn new() -> Self {
Self {
config: ProfilerConfig::default(),
progress_sink: ProgressSink::None,
}
}
pub fn with_config(config: ProfilerConfig) -> Self {
Self {
config,
progress_sink: ProgressSink::None,
}
}
pub fn engine(mut self, engine: EngineType) -> Self {
self.config.engine = engine;
self
}
pub fn chunk_size(mut self, chunk_size: ChunkSize) -> Self {
self.config.chunk_size = chunk_size;
self
}
pub fn sampling(mut self, strategy: SamplingStrategy) -> Self {
self.config.sampling = strategy;
self
}
pub fn memory_limit_mb(mut self, mb: usize) -> Self {
self.config.memory_limit_mb = Some(mb);
self
}
pub fn stop_when(mut self, condition: StopCondition) -> Self {
self.config.stop_condition = condition;
self
}
pub fn format(mut self, format: FileFormat) -> Self {
self.config.format_override = Some(format);
self
}
pub fn csv_delimiter(mut self, delimiter: u8) -> Self {
self.config.csv_delimiter = Some(delimiter);
self
}
pub fn csv_flexible(mut self, flexible: bool) -> Self {
self.config.csv_flexible = Some(flexible);
self
}
#[cfg(feature = "database")]
pub fn database(mut self, config: DatabaseConfig) -> Self {
self.config.database_config = Some(config);
self
}
#[cfg(feature = "database")]
pub fn connection_string(mut self, conn: &str) -> Self {
self.config.database_config = Some(DatabaseConfig {
connection_string: conn.to_string(),
..Default::default()
});
self
}
pub fn quality_dimensions(mut self, dims: Vec<QualityDimension>) -> Self {
self.config.quality_dimensions = Some(dims);
self
}
pub fn progress_interval(mut self, interval: Duration) -> Self {
self.config.progress_interval = interval;
self
}
pub fn progress_sink(mut self, sink: ProgressSink) -> Self {
self.progress_sink = sink;
self
}
pub fn on_progress<F>(mut self, callback: F) -> Self
where
F: Fn(ProgressEvent) + Send + Sync + 'static,
{
self.progress_sink = ProgressSink::Callback(Arc::new(callback));
self
}
#[cfg(feature = "async-streaming")]
pub fn progress_channel(mut self, tx: tokio::sync::mpsc::Sender<ProgressEvent>) -> Self {
self.progress_sink = ProgressSink::Channel(tx);
self
}
pub fn analyze_file<P: AsRef<Path>>(
&self,
file_path: P,
) -> Result<ProfileReport, DataProfilerError> {
let path = file_path.as_ref();
let format = self
.config
.format_override
.clone()
.unwrap_or_else(|| Self::detect_format(path));
match self.config.engine {
EngineType::Auto => self.run_auto(path, format),
EngineType::Incremental => self.run_incremental(path, format),
EngineType::Columnar => self.run_columnar(path, format),
}
}
pub fn analyze_source(&self, source: &DataSource) -> Result<ProfileReport, DataProfilerError> {
match source {
DataSource::File { path, .. } => self.analyze_file(Path::new(path)),
_ => Err(DataProfilerError::UnsupportedDataSource {
message: "Only File DataSource is supported in synchronous API. \
Use analyze_source_async() for Query/Stream sources."
.to_string(),
}),
}
}
pub async fn analyze_source_async(
&self,
source: &DataSource,
) -> Result<ProfileReport, DataProfilerError> {
match source {
DataSource::File { path, .. } => self.analyze_file(Path::new(path)),
#[cfg(feature = "database")]
DataSource::Query { statement, .. } => self.analyze_query(statement).await,
_ => Err(DataProfilerError::UnsupportedDataSource {
message: "Unsupported DataSource variant for this configuration.".to_string(),
}),
}
}
pub fn infer_schema<P: AsRef<Path>>(&self, path: P) -> Result<SchemaResult, DataProfilerError> {
let path = path.as_ref();
let format = self
.config
.format_override
.clone()
.unwrap_or_else(|| Self::detect_format(path));
partial::infer_schema_with_format(path, format)
}
pub fn quick_row_count<P: AsRef<Path>>(
&self,
path: P,
) -> Result<RowCountEstimate, DataProfilerError> {
let path = path.as_ref();
let format = self
.config
.format_override
.clone()
.unwrap_or_else(|| Self::detect_format(path));
partial::quick_row_count_with_format(path, format)
}
pub(crate) fn detect_format(file_path: &Path) -> FileFormat {
file_path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| match ext.to_ascii_lowercase().as_str() {
"csv" | "tsv" | "txt" => FileFormat::Csv,
"json" => FileFormat::Json,
"jsonl" | "ndjson" => FileFormat::Jsonl,
"parquet" => FileFormat::Parquet,
other => FileFormat::Unknown(other.to_string()),
})
.unwrap_or(FileFormat::Csv) }
fn has_csv_config(&self) -> bool {
self.config.csv_delimiter.is_some() || self.config.csv_flexible.is_some()
}
fn csv_parser_config(&self) -> crate::parsers::csv::CsvParserConfig {
let mut csv_config = crate::parsers::csv::CsvParserConfig::default();
if let Some(d) = self.config.csv_delimiter {
csv_config = csv_config.with_delimiter(d);
}
if let Some(f) = self.config.csv_flexible {
csv_config.flexible = f;
}
csv_config
}
fn csv_config_for_file(&self, file_path: &Path) -> crate::parsers::csv::CsvParserConfig {
if self.has_csv_config() {
self.csv_parser_config()
} else {
let detected =
crate::parsers::csv::detect_delimiter_from_path(file_path).unwrap_or(b',');
crate::parsers::csv::CsvParserConfig::default().with_delimiter(detected)
}
}
fn run_auto(
&self,
file_path: &Path,
format: FileFormat,
) -> Result<ProfileReport, DataProfilerError> {
let dims = self.config.quality_dimensions.as_deref();
match format {
FileFormat::Json | FileFormat::Jsonl => {
crate::parsers::json::analyze_json_file_with_dimensions(
file_path,
&crate::parsers::json::JsonParserConfig::default(),
dims,
)
}
_ => {
let mut profiler = AdaptiveProfiler::new();
if let Some(d) = &self.config.quality_dimensions {
profiler = profiler.quality_dimensions(d.clone());
}
let csv_config = self.csv_config_for_file(file_path);
profiler = profiler.csv_config(csv_config);
profiler.analyze_file(file_path)
}
}
}
fn run_incremental(
&self,
file_path: &Path,
format: FileFormat,
) -> Result<ProfileReport, DataProfilerError> {
let dims = self.config.quality_dimensions.as_deref();
match format {
FileFormat::Json | FileFormat::Jsonl => {
return crate::parsers::json::analyze_json_file_with_dimensions(
file_path,
&crate::parsers::json::JsonParserConfig::default(),
dims,
);
}
FileFormat::Parquet => {
return crate::parsers::parquet::analyze_parquet_with_quality_dims(file_path, dims);
}
_ => {}
}
use crate::engines::streaming::IncrementalProfiler;
let mut profiler = IncrementalProfiler::new()
.chunk_size(self.config.chunk_size.clone())
.sampling(self.config.sampling.clone())
.stop_condition(self.config.stop_condition.clone())
.progress(self.progress_sink.clone(), self.config.progress_interval);
if let Some(d) = &self.config.quality_dimensions {
profiler = profiler.quality_dimensions(d.clone());
}
let csv_config = self.csv_config_for_file(file_path);
profiler = profiler.csv_config(csv_config);
profiler.analyze_file(file_path)
}
fn run_columnar(
&self,
file_path: &Path,
format: FileFormat,
) -> Result<ProfileReport, DataProfilerError> {
let dims = self.config.quality_dimensions.as_deref();
match format {
FileFormat::Parquet => {
return crate::parsers::parquet::analyze_parquet_with_quality_dims(file_path, dims);
}
FileFormat::Json | FileFormat::Jsonl => {
return crate::parsers::json::analyze_json_file_with_dimensions(
file_path,
&crate::parsers::json::JsonParserConfig::default(),
dims,
);
}
_ => {}
}
use crate::engines::columnar::ArrowProfiler;
let mut profiler = ArrowProfiler::new();
if let Some(mb) = self.config.memory_limit_mb {
profiler = profiler.memory_limit_mb(mb);
}
if let Some(d) = &self.config.quality_dimensions {
profiler = profiler.quality_dimensions(d.clone());
}
let csv_config = self.csv_config_for_file(file_path);
profiler = profiler.csv_config(csv_config);
profiler.analyze_csv_file(file_path)
}
}
impl Default for Profiler {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "database")]
impl Profiler {
pub async fn analyze_query(&self, query: &str) -> Result<ProfileReport, DataProfilerError> {
let config = self
.config
.database_config
.clone()
.ok_or(DataProfilerError::DatabaseConfigError {
message:
"No database connection configured. Use .database() or .connection_string() first."
.to_string(),
})?;
crate::database::analyze_database(
config,
query,
true,
self.config.quality_dimensions.clone(),
)
.await
}
pub async fn analyze_query_no_quality(
&self,
query: &str,
) -> Result<ProfileReport, DataProfilerError> {
let config = self
.config
.database_config
.clone()
.ok_or(DataProfilerError::DatabaseConfigError {
message:
"No database connection configured. Use .database() or .connection_string() first."
.to_string(),
})?;
crate::database::analyze_database(config, query, false, None).await
}
}
#[cfg(feature = "async-streaming")]
impl Profiler {
pub async fn profile_stream(
&self,
source: impl crate::engines::streaming::AsyncDataSource,
) -> Result<ProfileReport, DataProfilerError> {
use crate::engines::streaming::AsyncStreamingProfiler;
let mut profiler = AsyncStreamingProfiler::new()
.chunk_size(self.config.chunk_size.clone())
.sampling(self.config.sampling.clone())
.stop_condition(self.config.stop_condition.clone())
.progress(self.progress_sink.clone(), self.config.progress_interval);
if let Some(mb) = self.config.memory_limit_mb {
profiler = profiler.memory_limit_mb(mb);
}
if let Some(ref d) = self.config.quality_dimensions {
profiler = profiler.quality_dimensions(d.clone());
}
profiler.analyze_stream(source).await
}
pub async fn profile_file<P: AsRef<Path>>(
&self,
file_path: P,
) -> Result<ProfileReport, DataProfilerError> {
use crate::engines::streaming::async_source::AsyncSourceInfo;
let path = file_path.as_ref();
let format = self
.config
.format_override
.clone()
.unwrap_or_else(|| Self::detect_format(path));
match format {
FileFormat::Parquet => {
let path = path.to_path_buf();
let dims = self.config.quality_dimensions.clone();
tokio::task::spawn_blocking(move || {
crate::parsers::parquet::analyze_parquet_with_quality_dims(
&path,
dims.as_deref(),
)
})
.await
.map_err(|e| DataProfilerError::StreamingError {
message: format!("Blocking task failed: {e}"),
})?
}
FileFormat::Csv | FileFormat::Json | FileFormat::Jsonl => {
let metadata =
tokio::fs::metadata(path)
.await
.map_err(|e| DataProfilerError::IoError {
message: format!("{}: {e}", path.display()),
})?;
let file =
tokio::fs::File::open(path)
.await
.map_err(|e| DataProfilerError::IoError {
message: format!("{}: {e}", path.display()),
})?;
let info = AsyncSourceInfo {
label: path.display().to_string(),
format,
size_hint: Some(metadata.len()),
source_system: Some(crate::types::StreamSourceSystem::Custom("file".into())),
..Default::default()
};
self.profile_stream((file, info)).await
}
FileFormat::Unknown(ref ext) => Err(DataProfilerError::UnsupportedDataSource {
message: format!(
"Unknown file format '.{ext}'. Use .format() to override detection."
),
}),
}
}
pub async fn infer_schema_stream(
&self,
source: impl crate::engines::streaming::AsyncDataSource,
) -> Result<SchemaResult, DataProfilerError> {
partial::infer_schema_stream(source).await
}
pub async fn quick_row_count_stream(
&self,
source: impl crate::engines::streaming::AsyncDataSource,
) -> Result<RowCountEstimate, DataProfilerError> {
partial::quick_row_count_stream(source).await
}
}
#[cfg(feature = "parquet-async")]
impl Profiler {
pub async fn profile_url(&self, url: &str) -> Result<ProfileReport, DataProfilerError> {
use crate::engines::streaming::async_source::{AsyncSourceInfo, ReqwestSource};
let format = self.config.format_override.clone().unwrap_or_else(|| {
let without_query = url.split('?').next().unwrap_or(url);
let without_fragment = without_query.split('#').next().unwrap_or(without_query);
let last_segment = without_fragment.rsplit('/').next().unwrap_or("");
Self::detect_format(Path::new(last_segment))
});
match format {
FileFormat::Parquet => {
crate::parsers::parquet_async::analyze_parquet_async_http_dims(
url,
&crate::parsers::parquet::ParquetConfig::default(),
self.config.quality_dimensions.clone(),
)
.await
}
FileFormat::Csv | FileFormat::Json | FileFormat::Jsonl => {
let response =
reqwest::get(url)
.await
.map_err(|e| DataProfilerError::StreamingError {
message: format!("HTTP request failed: {e}"),
})?;
let status = response.status();
if !status.is_success() {
return Err(DataProfilerError::StreamingError {
message: format!("HTTP {status} for {url}"),
});
}
let size_hint = response.content_length();
let source = ReqwestSource::new(
response,
AsyncSourceInfo {
label: url.to_string(),
format,
size_hint,
source_system: Some(crate::types::StreamSourceSystem::Http),
..Default::default()
},
);
self.profile_stream(source).await
}
FileFormat::Unknown(ref ext) => Err(DataProfilerError::UnsupportedDataSource {
message: format!(
"Unknown format '.{ext}' in URL. Use .format() to specify the data format."
),
}),
}
}
}
pub fn quick_quality_check<P: AsRef<Path>>(file_path: P) -> Result<f64, DataProfilerError> {
let profiler = Profiler::new();
let report = profiler.analyze_file(file_path)?;
Ok(report.quality_score().unwrap_or(0.0))
}
pub fn quick_quality_check_source(source: &DataSource) -> Result<f64, DataProfilerError> {
let profiler = Profiler::new();
let report = profiler.analyze_source(source)?;
Ok(report.quality_score().unwrap_or(0.0))
}