use crate::core::errors::DataProfilerError;
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FileFormat {
Csv,
Json,
Jsonl,
Parquet,
#[serde(untagged)]
Unknown(String),
}
impl std::fmt::Display for FileFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Csv => write!(f, "csv"),
Self::Json => write!(f, "json"),
Self::Jsonl => write!(f, "jsonl"),
Self::Parquet => write!(f, "parquet"),
Self::Unknown(s) => write!(f, "{}", s),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum QueryEngine {
DataFusion,
Postgres,
MySql,
Sqlite,
Snowflake,
BigQuery,
#[serde(untagged)]
Custom(String),
}
impl std::fmt::Display for QueryEngine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DataFusion => write!(f, "datafusion"),
Self::Postgres => write!(f, "postgres"),
Self::MySql => write!(f, "mysql"),
Self::Sqlite => write!(f, "sqlite"),
Self::Snowflake => write!(f, "snowflake"),
Self::BigQuery => write!(f, "bigquery"),
Self::Custom(s) => write!(f, "{}", s),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DataFrameLibrary {
Pandas,
Polars,
PyArrow,
#[serde(untagged)]
Custom(String),
}
impl std::fmt::Display for DataFrameLibrary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pandas => write!(f, "pandas"),
Self::Polars => write!(f, "polars"),
Self::PyArrow => write!(f, "pyarrow"),
Self::Custom(s) => write!(f, "{}", s),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum StreamSourceSystem {
Kafka,
Kinesis,
Pulsar,
Http, WebSocket, #[serde(untagged)]
Custom(String),
}
impl std::fmt::Display for StreamSourceSystem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Kafka => write!(f, "kafka"),
Self::Kinesis => write!(f, "kinesis"),
Self::Pulsar => write!(f, "pulsar"),
Self::Http => write!(f, "http"),
Self::WebSocket => write!(f, "websocket"),
Self::Custom(s) => write!(f, "{}", s),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum DataSource {
File {
path: String,
format: FileFormat,
size_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
modified_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
parquet_metadata: Option<ParquetMetadata>,
},
Query {
engine: QueryEngine,
statement: String,
#[serde(skip_serializing_if = "Option::is_none")]
database: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
execution_id: Option<String>,
},
#[serde(rename = "dataframe")]
DataFrame {
name: String,
source_library: DataFrameLibrary,
row_count: usize,
column_count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
memory_bytes: Option<u64>,
},
Stream {
topic: String,
batch_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
partition: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
consumer_group: Option<String>,
source_system: StreamSourceSystem,
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
first_record_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
last_record_at: Option<String>,
},
}
impl DataSource {
pub fn identifier(&self) -> String {
match self {
Self::File { path, .. } => path.clone(),
Self::Query {
engine, statement, ..
} => {
let truncated = if statement.len() > 50 {
format!("{}...", &statement[..47])
} else {
statement.clone()
};
format!("{}: {}", engine, truncated)
}
Self::DataFrame {
name,
source_library,
..
} => format!("{}[{}]", source_library, name),
Self::Stream {
source_system,
topic,
batch_id,
..
} => format!("{}[{}]-batch:{}", source_system, topic, batch_id),
}
}
pub fn size_mb(&self) -> Option<f64> {
match self {
Self::File { size_bytes, .. } => Some(*size_bytes as f64 / 1_048_576.0),
Self::DataFrame { memory_bytes, .. } => memory_bytes.map(|b| b as f64 / 1_048_576.0),
Self::Query { .. } | Self::Stream { .. } => None,
}
}
pub fn is_file(&self) -> bool {
matches!(self, Self::File { .. })
}
pub fn is_query(&self) -> bool {
matches!(self, Self::Query { .. })
}
pub fn is_dataframe(&self) -> bool {
matches!(self, Self::DataFrame { .. })
}
pub fn is_stream(&self) -> bool {
matches!(self, Self::Stream { .. })
}
pub fn file_path(&self) -> Option<&str> {
match self {
Self::File { path, .. } => Some(path),
_ => None,
}
}
pub fn stream_topic(&self) -> Option<&str> {
match self {
Self::Stream { topic, .. } => Some(topic),
_ => None,
}
}
pub fn batch_id(&self) -> Option<&str> {
match self {
Self::Stream { batch_id, .. } => Some(batch_id),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum QualityDimension {
Completeness,
Consistency,
Uniqueness,
Accuracy,
Timeliness,
}
impl QualityDimension {
pub fn all() -> Vec<Self> {
vec![
Self::Completeness,
Self::Consistency,
Self::Uniqueness,
Self::Accuracy,
Self::Timeliness,
]
}
}
impl std::str::FromStr for QualityDimension {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"completeness" => Ok(Self::Completeness),
"consistency" => Ok(Self::Consistency),
"uniqueness" => Ok(Self::Uniqueness),
"accuracy" => Ok(Self::Accuracy),
"timeliness" => Ok(Self::Timeliness),
_ => Err(format!("Unknown quality dimension: {s}")),
}
}
}
impl std::fmt::Display for QualityDimension {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Completeness => write!(f, "completeness"),
Self::Consistency => write!(f, "consistency"),
Self::Uniqueness => write!(f, "uniqueness"),
Self::Accuracy => write!(f, "accuracy"),
Self::Timeliness => write!(f, "timeliness"),
}
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct CompletenessMetrics {
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub missing_values_ratio: f64,
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub complete_records_ratio: f64,
pub null_columns: Vec<String>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct ConsistencyMetrics {
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub data_type_consistency: f64,
pub format_violations: usize,
pub encoding_issues: usize,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct UniquenessMetrics {
pub duplicate_rows: usize,
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub key_uniqueness: f64,
pub high_cardinality_warning: bool,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct AccuracyMetrics {
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub outlier_ratio: f64,
pub range_violations: usize,
pub negative_values_in_positive: usize,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct TimelinessMetrics {
pub future_dates_count: usize,
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub stale_data_ratio: f64,
pub temporal_violations: usize,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct QualityMetrics {
#[serde(skip_serializing_if = "Option::is_none")]
pub completeness: Option<CompletenessMetrics>,
#[serde(skip_serializing_if = "Option::is_none")]
pub consistency: Option<ConsistencyMetrics>,
#[serde(skip_serializing_if = "Option::is_none")]
pub uniqueness: Option<UniquenessMetrics>,
#[serde(skip_serializing_if = "Option::is_none")]
pub accuracy: Option<AccuracyMetrics>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeliness: Option<TimelinessMetrics>,
}
impl QualityMetrics {
pub fn empty() -> Self {
Self {
completeness: Some(CompletenessMetrics {
missing_values_ratio: 0.0,
complete_records_ratio: 100.0,
null_columns: vec![],
}),
consistency: Some(ConsistencyMetrics {
data_type_consistency: 100.0,
format_violations: 0,
encoding_issues: 0,
}),
uniqueness: Some(UniquenessMetrics {
duplicate_rows: 0,
key_uniqueness: 100.0,
high_cardinality_warning: false,
}),
accuracy: Some(AccuracyMetrics {
outlier_ratio: 0.0,
range_violations: 0,
negative_values_in_positive: 0,
}),
timeliness: Some(TimelinessMetrics {
future_dates_count: 0,
stale_data_ratio: 0.0,
temporal_violations: 0,
}),
}
}
pub fn calculate_from_data(
data: &HashMap<String, Vec<String>>,
column_profiles: &[ColumnProfile],
) -> Result<Self, DataProfilerError> {
let calculator = crate::analysis::MetricsCalculator::new();
calculator.calculate_comprehensive_metrics(data, column_profiles, None)
}
pub fn overall_score(&self) -> f64 {
let mut total_weight = 0.0;
let mut score = 0.0;
if let Some(c) = &self.completeness {
total_weight += 0.3;
score += c.complete_records_ratio * 0.3;
}
if let Some(c) = &self.consistency {
total_weight += 0.25;
score += c.data_type_consistency * 0.25;
}
if let Some(u) = &self.uniqueness {
total_weight += 0.2;
score += u.key_uniqueness * 0.2;
}
if let Some(a) = &self.accuracy {
total_weight += 0.15;
score += (100.0 - a.outlier_ratio) * 0.15;
}
if let Some(t) = &self.timeliness {
total_weight += 0.1;
score += (100.0 - t.stale_data_ratio) * 0.1;
}
if total_weight > 0.0 {
(score / total_weight).min(100.0)
} else {
0.0
}
}
pub fn missing_values_ratio(&self) -> f64 {
self.completeness
.as_ref()
.map_or(0.0, |c| c.missing_values_ratio)
}
pub fn complete_records_ratio(&self) -> f64 {
self.completeness
.as_ref()
.map_or(100.0, |c| c.complete_records_ratio)
}
pub fn null_columns(&self) -> &[String] {
self.completeness.as_ref().map_or(&[], |c| &c.null_columns)
}
pub fn data_type_consistency(&self) -> f64 {
self.consistency
.as_ref()
.map_or(100.0, |c| c.data_type_consistency)
}
pub fn format_violations(&self) -> usize {
self.consistency.as_ref().map_or(0, |c| c.format_violations)
}
pub fn encoding_issues(&self) -> usize {
self.consistency.as_ref().map_or(0, |c| c.encoding_issues)
}
pub fn duplicate_rows(&self) -> usize {
self.uniqueness.as_ref().map_or(0, |u| u.duplicate_rows)
}
pub fn key_uniqueness(&self) -> f64 {
self.uniqueness.as_ref().map_or(100.0, |u| u.key_uniqueness)
}
pub fn high_cardinality_warning(&self) -> bool {
self.uniqueness
.as_ref()
.is_some_and(|u| u.high_cardinality_warning)
}
pub fn outlier_ratio(&self) -> f64 {
self.accuracy.as_ref().map_or(0.0, |a| a.outlier_ratio)
}
pub fn range_violations(&self) -> usize {
self.accuracy.as_ref().map_or(0, |a| a.range_violations)
}
pub fn negative_values_in_positive(&self) -> usize {
self.accuracy
.as_ref()
.map_or(0, |a| a.negative_values_in_positive)
}
pub fn future_dates_count(&self) -> usize {
self.timeliness.as_ref().map_or(0, |t| t.future_dates_count)
}
pub fn stale_data_ratio(&self) -> f64 {
self.timeliness.as_ref().map_or(0.0, |t| t.stale_data_ratio)
}
pub fn temporal_violations(&self) -> usize {
self.timeliness
.as_ref()
.map_or(0, |t| t.temporal_violations)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum MetricConfidence {
Exact,
Approximate {
sample_size: usize,
population_size: Option<usize>,
},
Mixed {
exact_dimensions: Vec<String>,
sampled_dimensions: Vec<String>,
sample_size: usize,
},
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct QualityAssessment {
pub metrics: QualityMetrics,
pub confidence: MetricConfidence,
}
impl QualityAssessment {
pub fn exact(metrics: QualityMetrics) -> Self {
Self {
metrics,
confidence: MetricConfidence::Exact,
}
}
pub fn approximate(
metrics: QualityMetrics,
sample_size: usize,
population_size: Option<usize>,
) -> Self {
Self {
metrics,
confidence: MetricConfidence::Approximate {
sample_size,
population_size,
},
}
}
pub fn score(&self) -> f64 {
self.metrics.overall_score()
}
}
impl From<QualityMetrics> for QualityAssessment {
fn from(metrics: QualityMetrics) -> Self {
Self::exact(metrics)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ProfileReport {
pub id: String,
pub timestamp: String,
pub data_source: DataSource,
pub column_profiles: Vec<ColumnProfile>,
#[serde(alias = "scan_info")]
pub execution: ExecutionMetadata,
#[serde(
alias = "data_quality_metrics",
skip_serializing_if = "Option::is_none",
default,
deserialize_with = "deserialize_quality_compat"
)]
pub quality: Option<QualityAssessment>,
}
impl ProfileReport {
pub fn new(
data_source: DataSource,
column_profiles: Vec<ColumnProfile>,
execution: ExecutionMetadata,
quality: Option<QualityAssessment>,
) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
data_source,
column_profiles,
execution,
quality,
}
}
pub fn with_id(mut self, id: impl Into<String>) -> Self {
self.id = id.into();
self
}
pub fn with_timestamp(mut self, timestamp: impl Into<String>) -> Self {
self.timestamp = timestamp.into();
self
}
pub fn quality_score(&self) -> Option<f64> {
self.quality.as_ref().map(|q| q.score())
}
pub fn source_identifier(&self) -> String {
self.data_source.identifier()
}
}
fn deserialize_quality_compat<'de, D>(
deserializer: D,
) -> Result<Option<QualityAssessment>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
let value: Option<serde_json::Value> = Option::deserialize(deserializer)?;
match value {
None => Ok(None),
Some(v) => {
if v.get("metrics").is_some() && v.get("confidence").is_some() {
let assessment: QualityAssessment =
serde_json::from_value(v).map_err(serde::de::Error::custom)?;
Ok(Some(assessment))
} else {
let metrics: QualityMetrics =
serde_json::from_value(v).map_err(serde::de::Error::custom)?;
Ok(Some(QualityAssessment::exact(metrics)))
}
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ParquetMetadata {
pub num_row_groups: usize,
pub compression: String,
pub version: i32,
pub schema_summary: String,
pub compressed_size_bytes: u64,
pub uncompressed_size_bytes: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum TruncationReason {
MaxRows(u64),
MaxBytes(u64),
MemoryPressure,
StopCondition(String),
StreamClosed,
Timeout,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ExecutionMetadata {
pub rows_processed: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub bytes_consumed: Option<u64>,
pub columns_detected: usize,
pub scan_time_ms: u128,
#[serde(skip_serializing_if = "Option::is_none")]
pub throughput_rows_sec: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_peak_mb: Option<f64>,
pub error_count: usize,
pub source_exhausted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub truncation_reason: Option<TruncationReason>,
pub sampling_applied: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub sampling_ratio: Option<f64>,
}
impl ExecutionMetadata {
pub fn new(rows_processed: usize, columns_detected: usize, scan_time_ms: u128) -> Self {
let throughput_rows_sec = if scan_time_ms > 0 {
Some(rows_processed as f64 / (scan_time_ms as f64 / 1000.0))
} else {
None
};
Self {
rows_processed,
bytes_consumed: None,
columns_detected,
scan_time_ms,
throughput_rows_sec,
memory_peak_mb: None,
error_count: 0,
source_exhausted: true,
truncation_reason: None,
sampling_applied: false,
sampling_ratio: None,
}
}
pub fn with_sampling(mut self, ratio: f64) -> Self {
self.sampling_applied = true;
self.sampling_ratio = Some(ratio);
self
}
pub fn with_source_exhausted(mut self, exhausted: bool) -> Self {
self.source_exhausted = exhausted;
self
}
pub fn with_truncation(mut self, reason: TruncationReason) -> Self {
self.source_exhausted = false;
self.truncation_reason = Some(reason);
self
}
pub fn with_bytes_consumed(mut self, bytes: u64) -> Self {
self.bytes_consumed = Some(bytes);
self
}
pub fn with_error_count(mut self, count: usize) -> Self {
self.error_count = count;
self
}
pub fn with_memory_peak_mb(mut self, mb: f64) -> Self {
self.memory_peak_mb = Some(mb);
self
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ColumnProfile {
pub name: String,
pub data_type: DataType,
pub null_count: usize,
pub total_count: usize,
pub unique_count: Option<usize>,
pub stats: ColumnStats,
pub patterns: Vec<Pattern>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum DataType {
String,
Integer,
Float,
Date,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct Quartiles {
pub q1: f64,
pub q2: f64,
pub q3: f64,
pub iqr: f64,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct FrequencyItem {
pub value: String,
pub count: usize,
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub percentage: f64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NumericStats {
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub min: f64,
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub max: f64,
#[serde(serialize_with = "crate::serde_helpers::round_4")]
pub mean: f64,
#[serde(serialize_with = "crate::serde_helpers::round_4")]
pub std_dev: f64,
#[serde(serialize_with = "crate::serde_helpers::round_4")]
pub variance: f64,
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "crate::serde_helpers::round_2_opt"
)]
pub median: Option<f64>,
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "crate::serde_helpers::quartiles::serialize"
)]
pub quartiles: Option<Quartiles>,
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "crate::serde_helpers::round_2_opt"
)]
pub mode: Option<f64>,
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "crate::serde_helpers::round_2_opt"
)]
pub coefficient_of_variation: Option<f64>,
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "crate::serde_helpers::round_4_opt"
)]
pub skewness: Option<f64>,
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "crate::serde_helpers::round_4_opt"
)]
pub kurtosis: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_approximate: Option<bool>,
}
impl NumericStats {
pub fn empty() -> Self {
Self {
min: 0.0,
max: 0.0,
mean: 0.0,
std_dev: 0.0,
variance: 0.0,
median: None,
quartiles: None,
mode: None,
coefficient_of_variation: None,
skewness: None,
kurtosis: None,
is_approximate: None,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TextStats {
pub min_length: usize,
pub max_length: usize,
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub avg_length: f64,
#[serde(skip_serializing_if = "Option::is_none")]
pub most_frequent: Option<Vec<FrequencyItem>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub least_frequent: Option<Vec<FrequencyItem>>,
}
impl TextStats {
pub fn empty() -> Self {
Self {
min_length: 0,
max_length: 0,
avg_length: 0.0,
most_frequent: None,
least_frequent: None,
}
}
pub fn from_lengths(min_length: usize, max_length: usize, avg_length: f64) -> Self {
Self {
min_length: if min_length == usize::MAX {
0
} else {
min_length
},
max_length,
avg_length,
most_frequent: None,
least_frequent: None,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DateTimeStats {
pub min_datetime: String,
pub max_datetime: String,
#[serde(serialize_with = "crate::serde_helpers::round_2")]
pub duration_days: f64,
pub year_distribution: HashMap<i32, usize>,
pub month_distribution: HashMap<u32, usize>,
pub day_of_week_distribution: HashMap<String, usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub hour_distribution: Option<HashMap<u32, usize>>,
}
impl DateTimeStats {
pub fn empty() -> Self {
Self {
min_datetime: String::new(),
max_datetime: String::new(),
duration_days: 0.0,
year_distribution: HashMap::new(),
month_distribution: HashMap::new(),
day_of_week_distribution: HashMap::new(),
hour_distribution: None,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum ColumnStats {
Numeric(NumericStats),
Text(TextStats),
DateTime(DateTimeStats),
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Pattern {
pub name: String,
pub regex: String,
pub match_count: usize,
pub match_percentage: f64,
}
#[derive(Clone, Debug)]
pub enum OutputFormat {
Text,
Json,
Csv,
Plain,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SchemaResult {
pub columns: Vec<ColumnSchema>,
pub rows_sampled: usize,
pub inference_time_ms: u128,
pub schema_stable: bool,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ColumnSchema {
pub name: String,
pub data_type: DataType,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RowCountEstimate {
pub count: u64,
pub exact: bool,
pub method: CountMethod,
pub count_time_ms: u128,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum CountMethod {
ParquetMetadata,
FullScan,
Sampling,
StreamFullScan,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_metrics_perfect_score() {
let metrics = QualityMetrics::empty();
assert!((metrics.overall_score() - 100.0).abs() < 0.01);
}
#[test]
fn test_quality_score_weights_sum_to_100() {
let metrics = QualityMetrics::empty();
assert!((metrics.overall_score() - 100.0).abs() < 0.01);
}
#[test]
fn test_quality_score_completeness_weight() {
let mut metrics = QualityMetrics::empty();
if let Some(ref mut c) = metrics.completeness {
c.complete_records_ratio = 0.0;
}
assert!((metrics.overall_score() - 70.0).abs() < 0.01);
}
#[test]
fn test_quality_score_all_bad() {
let metrics = QualityMetrics {
completeness: Some(CompletenessMetrics {
complete_records_ratio: 0.0,
..CompletenessMetrics::default()
}),
consistency: Some(ConsistencyMetrics {
data_type_consistency: 0.0,
..ConsistencyMetrics::default()
}),
uniqueness: Some(UniquenessMetrics {
key_uniqueness: 0.0,
..UniquenessMetrics::default()
}),
accuracy: Some(AccuracyMetrics {
outlier_ratio: 100.0,
..AccuracyMetrics::default()
}),
timeliness: Some(TimelinessMetrics {
stale_data_ratio: 100.0,
..TimelinessMetrics::default()
}),
};
assert!((metrics.overall_score() - 0.0).abs() < 0.01);
}
#[test]
fn test_column_profile_json_roundtrip() {
let profile = ColumnProfile {
name: "test_col".to_string(),
data_type: DataType::Integer,
null_count: 2,
total_count: 10,
unique_count: Some(8),
stats: ColumnStats::Numeric(NumericStats {
min: 1.0,
max: 100.0,
mean: 50.5,
std_dev: 28.87,
variance: 833.25,
median: Some(50.0),
quartiles: Some(Quartiles {
q1: 25.0,
q2: 50.0,
q3: 75.0,
iqr: 50.0,
}),
mode: Some(42.0),
coefficient_of_variation: Some(57.17),
skewness: Some(0.0),
kurtosis: Some(-1.2),
is_approximate: Some(false),
}),
patterns: vec![],
};
let json = serde_json::to_string(&profile).unwrap();
let deserialized: ColumnProfile = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.name, "test_col");
assert_eq!(deserialized.data_type, DataType::Integer);
assert_eq!(deserialized.total_count, 10);
assert_eq!(deserialized.null_count, 2);
if let ColumnStats::Numeric(n) = &deserialized.stats {
assert!((n.min - 1.0).abs() < 0.01);
assert!((n.max - 100.0).abs() < 0.01);
assert!((n.mean - 50.5).abs() < 0.01);
assert!(n.median.is_some());
assert!(n.quartiles.is_some());
} else {
panic!("Expected Numeric stats after roundtrip");
}
}
#[test]
fn test_text_stats_json_roundtrip() {
let profile = ColumnProfile {
name: "name".to_string(),
data_type: DataType::String,
null_count: 0,
total_count: 3,
unique_count: Some(3),
stats: ColumnStats::Text(TextStats {
min_length: 3,
max_length: 7,
avg_length: 5.0,
most_frequent: None,
least_frequent: None,
}),
patterns: vec![],
};
let json = serde_json::to_string(&profile).unwrap();
let deserialized: ColumnProfile = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.data_type, DataType::String);
if let ColumnStats::Text(t) = &deserialized.stats {
assert_eq!(t.min_length, 3);
assert_eq!(t.max_length, 7);
} else {
panic!("Expected Text stats after roundtrip");
}
}
#[test]
fn test_profile_report_json_roundtrip() {
let report = ProfileReport::new(
DataSource::File {
path: "test.csv".to_string(),
format: FileFormat::Csv,
size_bytes: 1024,
modified_at: None,
parquet_metadata: None,
},
vec![],
ExecutionMetadata::new(100, 5, 50),
Some(QualityAssessment::exact(QualityMetrics::empty())),
);
let json = serde_json::to_string(&report).unwrap();
let deserialized: ProfileReport = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.execution.rows_processed, 100);
assert_eq!(deserialized.execution.columns_detected, 5);
assert!((deserialized.quality_score().unwrap() - 100.0).abs() < 0.01);
}
#[test]
fn test_profile_report_without_quality() {
let report = ProfileReport::new(
DataSource::File {
path: "test.csv".to_string(),
format: FileFormat::Csv,
size_bytes: 0,
modified_at: None,
parquet_metadata: None,
},
vec![],
ExecutionMetadata::new(0, 0, 0),
None,
);
assert!(report.quality_score().is_none());
assert!(report.quality.is_none());
let json = serde_json::to_string(&report).unwrap();
let deserialized: ProfileReport = serde_json::from_str(&json).unwrap();
assert!(deserialized.quality.is_none());
}
#[test]
fn test_execution_metadata_throughput_calculation() {
let meta = ExecutionMetadata::new(1000, 5, 500); assert!(meta.throughput_rows_sec.is_some());
assert!((meta.throughput_rows_sec.unwrap() - 2000.0).abs() < 1.0);
assert!(meta.source_exhausted);
assert!(!meta.sampling_applied);
assert!(meta.sampling_ratio.is_none());
}
#[test]
fn test_execution_metadata_zero_time_no_throughput() {
let meta = ExecutionMetadata::new(100, 3, 0);
assert!(meta.throughput_rows_sec.is_none());
}
#[test]
fn test_execution_metadata_with_sampling() {
let meta = ExecutionMetadata::new(500, 3, 100).with_sampling(0.5);
assert!(meta.sampling_applied);
assert_eq!(meta.sampling_ratio, Some(0.5));
}
#[test]
fn test_execution_metadata_with_truncation() {
let meta =
ExecutionMetadata::new(1000, 5, 200).with_truncation(TruncationReason::MaxRows(1000));
assert!(!meta.source_exhausted);
assert!(meta.truncation_reason.is_some());
}
#[test]
fn test_truncation_reason_serde_roundtrip() {
let reasons = vec![
TruncationReason::MaxRows(5000),
TruncationReason::MaxBytes(1_000_000),
TruncationReason::MemoryPressure,
TruncationReason::StopCondition("accuracy > 0.95".to_string()),
TruncationReason::StreamClosed,
TruncationReason::Timeout,
];
for reason in reasons {
let json = serde_json::to_string(&reason).unwrap();
let deserialized: TruncationReason = serde_json::from_str(&json).unwrap();
let json2 = serde_json::to_string(&deserialized).unwrap();
assert_eq!(json, json2);
}
}
#[test]
fn test_data_source_file_identifier() {
let ds = DataSource::File {
path: "/path/to/data.csv".to_string(),
format: FileFormat::Csv,
size_bytes: 0,
modified_at: None,
parquet_metadata: None,
};
assert_eq!(ds.identifier(), "/path/to/data.csv");
assert!(ds.is_file());
assert!(!ds.is_query());
assert!(!ds.is_dataframe());
assert!(!ds.is_stream());
}
#[test]
fn test_data_source_stream_identifier_and_helpers() {
let ds = DataSource::Stream {
topic: "events".to_string(),
batch_id: "b1".to_string(),
partition: Some(0),
consumer_group: None,
source_system: StreamSourceSystem::Kafka,
session_id: None,
first_record_at: None,
last_record_at: None,
};
assert_eq!(ds.identifier(), "kafka[events]-batch:b1");
assert!(ds.is_stream());
assert_eq!(ds.stream_topic(), Some("events"));
assert_eq!(ds.batch_id(), Some("b1"));
assert!(!ds.is_file());
assert!(!ds.is_query());
assert!(ds.size_mb().is_none());
}
#[test]
fn test_stream_json_serialization() {
let ds = DataSource::Stream {
topic: "sensor-data".to_string(),
batch_id: "batch-789".to_string(),
partition: Some(2),
consumer_group: Some("processing-group".to_string()),
source_system: StreamSourceSystem::Kinesis,
session_id: Some("session-1".to_string()),
first_record_at: Some("2023-01-01T10:00:00Z".to_string()),
last_record_at: Some("2023-01-01T10:05:00Z".to_string()),
};
let json = serde_json::to_string(&ds).unwrap();
assert!(json.contains(r#""type":"stream""#));
assert!(json.contains(r#""source_system":"kinesis""#));
assert!(json.contains(r#""topic":"sensor-data""#));
let deserialized: DataSource = serde_json::from_str(&json).unwrap();
assert!(deserialized.is_stream());
assert_eq!(deserialized.stream_topic(), Some("sensor-data"));
}
#[test]
fn test_partial_dimensions_only_completeness() {
let metrics = QualityMetrics {
completeness: Some(CompletenessMetrics {
complete_records_ratio: 100.0,
missing_values_ratio: 0.0,
null_columns: vec![],
}),
consistency: None,
uniqueness: None,
accuracy: None,
timeliness: None,
};
assert!(metrics.completeness.is_some());
assert!(metrics.consistency.is_none());
assert!(metrics.uniqueness.is_none());
assert!(metrics.accuracy.is_none());
assert!(metrics.timeliness.is_none());
assert!((metrics.overall_score() - 100.0).abs() < 0.01);
}
#[test]
fn test_partial_dimensions_two_dimensions() {
let metrics = QualityMetrics {
completeness: Some(CompletenessMetrics {
complete_records_ratio: 50.0,
..CompletenessMetrics::default()
}),
consistency: None,
uniqueness: Some(UniquenessMetrics {
key_uniqueness: 80.0,
..UniquenessMetrics::default()
}),
accuracy: None,
timeliness: None,
};
assert!((metrics.overall_score() - 62.0).abs() < 0.01);
}
#[test]
fn test_all_dimensions_none_score_zero() {
let metrics = QualityMetrics {
completeness: None,
consistency: None,
uniqueness: None,
accuracy: None,
timeliness: None,
};
assert!((metrics.overall_score() - 0.0).abs() < 0.01);
}
#[test]
fn test_partial_dimensions_json_skips_none() {
let metrics = QualityMetrics {
completeness: Some(CompletenessMetrics::default()),
consistency: None,
uniqueness: None,
accuracy: None,
timeliness: None,
};
let json = serde_json::to_string(&metrics).unwrap();
assert!(json.contains("completeness"));
assert!(!json.contains("consistency"));
assert!(!json.contains("uniqueness"));
assert!(!json.contains("accuracy"));
assert!(!json.contains("timeliness"));
}
#[test]
fn test_partial_dimensions_flat_accessors_return_defaults() {
let metrics = QualityMetrics {
completeness: None,
consistency: None,
uniqueness: None,
accuracy: None,
timeliness: None,
};
assert!((metrics.complete_records_ratio() - 100.0).abs() < 0.01);
assert!((metrics.data_type_consistency() - 100.0).abs() < 0.01);
assert!((metrics.key_uniqueness() - 100.0).abs() < 0.01);
assert!((metrics.missing_values_ratio() - 0.0).abs() < 0.01);
assert_eq!(metrics.duplicate_rows(), 0);
assert!(!metrics.high_cardinality_warning());
}
}