use pyo3::prelude::*;
use crate::types::{
ColumnProfile, ColumnStats, DataSource, DataType, Pattern, ProfileReport, QualityMetrics,
TruncationReason,
};
#[pyclass(name = "Pattern")]
#[derive(Clone)]
pub struct PyPattern {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub regex: String,
#[pyo3(get)]
pub match_count: usize,
#[pyo3(get)]
pub match_percentage: f64,
#[pyo3(get)]
pub category: String,
#[pyo3(get)]
pub confidence: f64,
}
impl From<&Pattern> for PyPattern {
fn from(p: &Pattern) -> Self {
Self {
name: p.name.clone(),
regex: p.regex.clone(),
match_count: p.match_count,
match_percentage: p.match_percentage,
category: p.category.to_string(),
confidence: p.confidence,
}
}
}
#[pyclass(name = "ColumnProfile")]
#[derive(Clone)]
pub struct PyColumnProfile {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub data_type: String,
#[pyo3(get)]
pub total_count: usize,
#[pyo3(get)]
pub null_count: usize,
#[pyo3(get)]
pub unique_count: Option<usize>,
#[pyo3(get)]
pub null_percentage: f64,
#[pyo3(get)]
pub uniqueness_ratio: f64,
#[pyo3(get)]
pub min: Option<f64>,
#[pyo3(get)]
pub max: Option<f64>,
#[pyo3(get)]
pub mean: Option<f64>,
#[pyo3(get)]
pub std_dev: Option<f64>,
#[pyo3(get)]
pub variance: Option<f64>,
#[pyo3(get)]
pub median: Option<f64>,
#[pyo3(get)]
pub mode: Option<f64>,
#[pyo3(get)]
pub skewness: Option<f64>,
#[pyo3(get)]
pub kurtosis: Option<f64>,
#[pyo3(get)]
pub coefficient_of_variation: Option<f64>,
#[pyo3(get)]
pub quartiles: Option<std::collections::HashMap<String, f64>>,
#[pyo3(get)]
pub is_approximate: Option<bool>,
#[pyo3(get)]
pub min_length: Option<usize>,
#[pyo3(get)]
pub max_length: Option<usize>,
#[pyo3(get)]
pub avg_length: Option<f64>,
#[pyo3(get)]
pub true_count: Option<usize>,
#[pyo3(get)]
pub false_count: Option<usize>,
#[pyo3(get)]
pub true_ratio: Option<f64>,
#[pyo3(get)]
pub patterns: Option<Vec<PyPattern>>,
}
impl From<&ColumnProfile> for PyColumnProfile {
fn from(profile: &ColumnProfile) -> Self {
let null_percentage = if profile.total_count > 0 {
(profile.null_count as f64 / profile.total_count as f64) * 100.0
} else {
0.0
};
let uniqueness_ratio = if let Some(unique) = profile.unique_count {
if profile.total_count > 0 {
unique as f64 / profile.total_count as f64
} else {
0.0
}
} else {
0.0
};
let (
min,
max,
mean,
std_dev,
variance,
median,
mode,
skewness,
kurtosis,
coefficient_of_variation,
quartiles,
is_approximate,
) = match &profile.stats {
ColumnStats::Numeric(n) => {
let q_map = n.quartiles.as_ref().map(|q| {
let mut m = std::collections::HashMap::new();
m.insert("q1".to_string(), q.q1);
m.insert("q2".to_string(), q.q2);
m.insert("q3".to_string(), q.q3);
m.insert("iqr".to_string(), q.iqr);
m
});
(
Some(n.min),
Some(n.max),
Some(n.mean),
Some(n.std_dev),
Some(n.variance),
n.median,
n.mode,
n.skewness,
n.kurtosis,
n.coefficient_of_variation,
q_map,
n.is_approximate,
)
}
ColumnStats::Boolean(_) => (
None, None, None, None, None, None, None, None, None, None, None, None,
),
_ => (
None, None, None, None, None, None, None, None, None, None, None, None,
),
};
let (min_length, max_length, avg_length) = match &profile.stats {
ColumnStats::Text(t) => (Some(t.min_length), Some(t.max_length), Some(t.avg_length)),
_ => (None, None, None),
};
let (true_count, false_count, true_ratio) = match &profile.stats {
ColumnStats::Boolean(b) => {
(Some(b.true_count), Some(b.false_count), Some(b.true_ratio))
}
_ => (None, None, None),
};
let patterns = if !profile.patterns.is_empty() {
Some(profile.patterns.iter().map(PyPattern::from).collect())
} else {
None
};
Self {
name: profile.name.clone(),
data_type: match profile.data_type {
DataType::Integer => "integer".to_string(),
DataType::Float => "float".to_string(),
DataType::String => "string".to_string(),
DataType::Date => "date".to_string(),
DataType::Boolean => "boolean".to_string(),
},
total_count: profile.total_count,
null_count: profile.null_count,
unique_count: profile.unique_count,
null_percentage,
uniqueness_ratio,
min,
max,
mean,
std_dev,
variance,
median,
mode,
skewness,
kurtosis,
coefficient_of_variation,
quartiles,
is_approximate,
min_length,
max_length,
avg_length,
true_count,
false_count,
true_ratio,
patterns,
}
}
}
#[pyclass(name = "DataQualityMetrics")]
#[derive(Clone)]
pub struct PyDataQualityMetrics {
inner: QualityMetrics,
}
impl From<&QualityMetrics> for PyDataQualityMetrics {
fn from(m: &QualityMetrics) -> Self {
Self { inner: m.clone() }
}
}
#[pymethods]
impl PyDataQualityMetrics {
#[getter]
fn missing_values_ratio(&self) -> f64 {
self.inner.missing_values_ratio()
}
#[getter]
fn complete_records_ratio(&self) -> f64 {
self.inner.complete_records_ratio()
}
#[getter]
fn null_columns(&self) -> Vec<String> {
self.inner.null_columns().to_vec()
}
#[getter]
fn data_type_consistency(&self) -> f64 {
self.inner.data_type_consistency()
}
#[getter]
fn format_violations(&self) -> usize {
self.inner.format_violations()
}
#[getter]
fn encoding_issues(&self) -> usize {
self.inner.encoding_issues()
}
#[getter]
fn duplicate_rows(&self) -> usize {
self.inner.duplicate_rows()
}
#[getter]
fn key_uniqueness(&self) -> f64 {
self.inner.key_uniqueness()
}
#[getter]
fn high_cardinality_warning(&self) -> bool {
self.inner.high_cardinality_warning()
}
#[getter]
fn outlier_ratio(&self) -> f64 {
self.inner.outlier_ratio()
}
#[getter]
fn range_violations(&self) -> usize {
self.inner.range_violations()
}
#[getter]
fn negative_values_in_positive(&self) -> usize {
self.inner.negative_values_in_positive()
}
#[getter]
fn future_dates_count(&self) -> usize {
self.inner.future_dates_count()
}
#[getter]
fn stale_data_ratio(&self) -> f64 {
self.inner.stale_data_ratio()
}
#[getter]
fn temporal_violations(&self) -> usize {
self.inner.temporal_violations()
}
#[getter]
fn completeness(
&self,
py: Python<'_>,
) -> PyResult<Option<std::collections::HashMap<String, Py<PyAny>>>> {
self.inner
.completeness
.as_ref()
.map(|c| -> PyResult<_> {
let mut m = std::collections::HashMap::new();
m.insert(
"missing_values_ratio".into(),
c.missing_values_ratio
.into_pyobject(py)?
.unbind()
.into_any(),
);
m.insert(
"complete_records_ratio".into(),
c.complete_records_ratio
.into_pyobject(py)?
.unbind()
.into_any(),
);
m.insert(
"null_columns".into(),
c.null_columns
.as_slice()
.into_pyobject(py)?
.unbind()
.into_any(),
);
Ok(m)
})
.transpose()
}
#[getter]
fn consistency(
&self,
py: Python<'_>,
) -> PyResult<Option<std::collections::HashMap<String, Py<PyAny>>>> {
self.inner
.consistency
.as_ref()
.map(|c| -> PyResult<_> {
let mut m = std::collections::HashMap::new();
m.insert(
"data_type_consistency".into(),
c.data_type_consistency
.into_pyobject(py)?
.unbind()
.into_any(),
);
m.insert(
"format_violations".into(),
c.format_violations.into_pyobject(py)?.unbind().into_any(),
);
m.insert(
"encoding_issues".into(),
c.encoding_issues.into_pyobject(py)?.unbind().into_any(),
);
Ok(m)
})
.transpose()
}
#[getter]
fn uniqueness(
&self,
py: Python<'_>,
) -> PyResult<Option<std::collections::HashMap<String, Py<PyAny>>>> {
self.inner
.uniqueness
.as_ref()
.map(|u| -> PyResult<_> {
let mut m = std::collections::HashMap::new();
m.insert(
"duplicate_rows".into(),
u.duplicate_rows.into_pyobject(py)?.unbind().into_any(),
);
m.insert(
"key_uniqueness".into(),
u.key_uniqueness.into_pyobject(py)?.unbind().into_any(),
);
m.insert(
"high_cardinality_warning".into(),
u.high_cardinality_warning
.into_pyobject(py)?
.to_owned()
.unbind()
.into_any(),
);
Ok(m)
})
.transpose()
}
#[getter]
fn accuracy(
&self,
py: Python<'_>,
) -> PyResult<Option<std::collections::HashMap<String, Py<PyAny>>>> {
self.inner
.accuracy
.as_ref()
.map(|a| -> PyResult<_> {
let mut m = std::collections::HashMap::new();
m.insert(
"outlier_ratio".into(),
a.outlier_ratio.into_pyobject(py)?.unbind().into_any(),
);
m.insert(
"range_violations".into(),
a.range_violations.into_pyobject(py)?.unbind().into_any(),
);
m.insert(
"negative_values_in_positive".into(),
a.negative_values_in_positive
.into_pyobject(py)?
.unbind()
.into_any(),
);
Ok(m)
})
.transpose()
}
#[getter]
fn timeliness(
&self,
py: Python<'_>,
) -> PyResult<Option<std::collections::HashMap<String, Py<PyAny>>>> {
self.inner
.timeliness
.as_ref()
.map(|t| -> PyResult<_> {
let mut m = std::collections::HashMap::new();
m.insert(
"future_dates_count".into(),
t.future_dates_count.into_pyobject(py)?.unbind().into_any(),
);
m.insert(
"stale_data_ratio".into(),
t.stale_data_ratio.into_pyobject(py)?.unbind().into_any(),
);
m.insert(
"temporal_violations".into(),
t.temporal_violations.into_pyobject(py)?.unbind().into_any(),
);
Ok(m)
})
.transpose()
}
fn overall_quality_score(&self) -> f64 {
self.inner.overall_score()
}
fn __str__(&self) -> String {
format!(
"DataQualityMetrics(score={:.1}%, completeness={:.1}%, consistency={:.1}%, uniqueness={:.1}%)",
self.inner.overall_score(),
self.inner.complete_records_ratio(),
self.inner.data_type_consistency(),
self.inner.key_uniqueness(),
)
}
}
#[pyclass(name = "ProfileReport")]
pub struct PyProfileReport {
pub(crate) inner: ProfileReport,
}
impl PyProfileReport {
pub fn new(report: ProfileReport) -> Self {
Self { inner: report }
}
}
#[pymethods]
impl PyProfileReport {
#[getter]
fn source(&self) -> String {
self.inner.data_source.identifier()
}
#[getter]
fn source_type(&self) -> &str {
match &self.inner.data_source {
DataSource::File { .. } => "file",
DataSource::DataFrame { .. } => "dataframe",
DataSource::Stream { .. } => "stream",
DataSource::Query { .. } => "query",
}
}
#[getter]
fn source_library(&self) -> Option<String> {
match &self.inner.data_source {
DataSource::DataFrame { source_library, .. } => Some(source_library.to_string()),
_ => None,
}
}
#[getter]
fn memory_bytes(&self) -> Option<u64> {
match &self.inner.data_source {
DataSource::DataFrame { memory_bytes, .. } => *memory_bytes,
_ => None,
}
}
#[getter]
fn rows_processed(&self) -> usize {
self.inner.execution.rows_processed
}
#[getter]
fn columns_detected(&self) -> usize {
self.inner.execution.columns_detected
}
#[getter]
fn scan_time_ms(&self) -> u128 {
self.inner.execution.scan_time_ms
}
#[getter]
fn source_exhausted(&self) -> bool {
self.inner.execution.source_exhausted
}
#[getter]
fn truncation_reason(&self) -> Option<String> {
self.inner
.execution
.truncation_reason
.as_ref()
.map(|r| match r {
TruncationReason::MaxRows(n) => format!("max_rows({})", n),
TruncationReason::MaxBytes(n) => format!("max_bytes({})", n),
TruncationReason::MemoryPressure => "memory_pressure".to_string(),
TruncationReason::StopCondition(s) => format!("stop_condition({})", s),
TruncationReason::StreamClosed => "stream_closed".to_string(),
TruncationReason::Timeout => "timeout".to_string(),
})
}
#[getter]
fn bytes_consumed(&self) -> Option<u64> {
self.inner.execution.bytes_consumed
}
#[getter]
fn throughput_rows_sec(&self) -> Option<f64> {
self.inner.execution.throughput_rows_sec
}
#[getter]
fn memory_peak_mb(&self) -> Option<f64> {
self.inner.execution.memory_peak_mb
}
#[getter]
fn error_count(&self) -> usize {
self.inner.execution.error_count
}
#[getter]
fn sampling_applied(&self) -> bool {
self.inner.execution.sampling_applied
}
#[getter]
fn sampling_ratio(&self) -> Option<f64> {
self.inner.execution.sampling_ratio
}
#[getter]
fn column_profiles(&self) -> Vec<PyColumnProfile> {
self.inner
.column_profiles
.iter()
.map(PyColumnProfile::from)
.collect()
}
#[getter]
fn quality(&self) -> Option<PyDataQualityMetrics> {
self.inner
.quality
.as_ref()
.map(|q| PyDataQualityMetrics::from(&q.metrics))
}
#[getter]
fn quality_score(&self) -> Option<f64> {
self.inner.quality_score()
}
fn to_json(&self) -> PyResult<String> {
serde_json::to_string_pretty(&self.inner).map_err(|e| {
pyo3::exceptions::PyRuntimeError::new_err(format!("JSON serialization failed: {}", e))
})
}
fn __repr__(&self) -> String {
format!(
"ProfileReport(source='{}', rows={}, columns={}, time={}ms, quality={:?})",
self.inner.data_source.identifier(),
self.inner.execution.rows_processed,
self.inner.execution.columns_detected,
self.inner.execution.scan_time_ms,
self.inner.quality_score(),
)
}
}