use std::collections::HashMap;
use polars::prelude::*;
use serde::{Deserialize, Serialize};
use super::null_value::NullValue;
use crate::{
chart_hints::ChartHints,
column::CanonicalColumnName,
error::{EtlError, EtlResult},
unit::QualityUnit,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityDerivation {
pub name: CanonicalColumnName,
pub domain: CanonicalColumnName,
pub mapping: HashMap<String, Vec<String>>,
#[serde(default)]
pub unmapped_policy: UnmappedPolicy,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub chart_hints: Option<ChartHints>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum UnmappedPolicy {
Label { label: String },
Null,
Error,
}
impl Default for UnmappedPolicy {
fn default() -> Self {
Self::Label {
label: "other".into(),
}
}
}
impl QualityDerivation {
pub fn new(
name: impl Into<CanonicalColumnName>,
domain: impl Into<CanonicalColumnName>,
) -> Self {
Self {
name: name.into(),
domain: domain.into(),
mapping: HashMap::new(),
unmapped_policy: UnmappedPolicy::default(),
chart_hints: None,
}
}
pub fn map(mut self, codomain: impl Into<String>, domain_values: Vec<String>) -> Self {
self.mapping.insert(codomain.into(), domain_values);
self
}
pub fn with_unmapped_policy(mut self, policy: UnmappedPolicy) -> Self {
self.unmapped_policy = policy;
self
}
pub fn with_unmapped_label(mut self, label: impl Into<String>) -> Self {
self.unmapped_policy = UnmappedPolicy::Label {
label: label.into(),
};
self
}
pub fn with_chart_hints(mut self, hints: ChartHints) -> Self {
self.chart_hints = Some(hints);
self
}
pub fn to_quality_unit(&self, subject: &CanonicalColumnName) -> QualityUnit {
let mut unit = QualityUnit::new(subject.as_str(), self.name.as_str());
if let Some(hints) = &self.chart_hints {
unit = unit.with_chart_hints(hints.clone());
}
if let UnmappedPolicy::Label { label } = &self.unmapped_policy {
unit = unit.with_null_extension(NullValue::string(label.as_str()));
}
unit
}
pub fn declared_domain_values(&self) -> Vec<&str> {
let mut out: Vec<&str> = self
.mapping
.values()
.flat_map(|v| v.iter().map(|s| s.as_str()))
.collect();
out.sort();
out.dedup();
out
}
}
pub fn compute_derived_quality(
domain_df: &DataFrame,
derivation: &QualityDerivation,
subject_col: &str,
) -> EtlResult<DataFrame> {
let domain_col = derivation.domain.as_str();
let derived_col = derivation.name.as_str();
domain_df.column(subject_col).map_err(|e| {
EtlError::DataProcessing(format!(
"compute_derived_quality: subject column '{subject_col}' missing: {e}"
))
})?;
let domain_series = domain_df.column(domain_col).map_err(|e| {
EtlError::DataProcessing(format!(
"compute_derived_quality: domain column '{domain_col}' missing: {e}"
))
})?;
let mut reverse: HashMap<&str, &str> = HashMap::new();
for (codomain, domain_values) in &derivation.mapping {
for dv in domain_values {
if let Some(prev) = reverse.insert(dv.as_str(), codomain.as_str())
&& prev != codomain.as_str()
{
return Err(EtlError::Config(format!(
"QualityDerivation '{}': domain value '{}' mapped to both \
'{}' and '{}' — codomains must partition the domain",
derived_col, dv, prev, codomain,
)));
}
}
}
let domain_str = domain_series.cast(&DataType::String).map_err(|e| {
EtlError::DataProcessing(format!(
"compute_derived_quality: cast '{domain_col}' to String: {e}"
))
})?;
let domain_ca = domain_str
.as_materialized_series()
.str()
.map_err(|e| {
EtlError::DataProcessing(format!(
"compute_derived_quality: '{domain_col}' not String after cast: {e}"
))
})?
.clone();
let mut derived: Vec<Option<String>> = Vec::with_capacity(domain_ca.len());
let mut unmapped_values: Vec<String> = Vec::new();
for opt in domain_ca.iter() {
match opt {
Some(v) => match reverse.get(v) {
Some(codomain) => derived.push(Some((*codomain).to_string())),
None => match &derivation.unmapped_policy {
UnmappedPolicy::Label { label } => {
derived.push(Some(label.clone()));
}
UnmappedPolicy::Null => derived.push(None),
UnmappedPolicy::Error => {
derived.push(None);
unmapped_values.push(v.to_string());
}
},
},
None => match &derivation.unmapped_policy {
UnmappedPolicy::Label { label } => derived.push(Some(label.clone())),
UnmappedPolicy::Null => derived.push(None),
UnmappedPolicy::Error => {
derived.push(None);
unmapped_values.push("<null>".into());
}
},
}
}
if !unmapped_values.is_empty() {
unmapped_values.sort();
unmapped_values.dedup();
return Err(EtlError::DataProcessing(format!(
"QualityDerivation '{derived_col}': {} domain values have no codomain \
(policy = Error). Unmapped values: {:?}",
unmapped_values.len(),
unmapped_values,
)));
}
let subject_col_data = domain_df
.column(subject_col)
.expect("checked above")
.clone();
let derived_series: Vec<Option<&str>> = derived.iter().map(|o| o.as_deref()).collect();
DataFrame::new(vec![
subject_col_data,
Column::new(derived_col.into(), derived_series),
])
.map_err(|e| {
EtlError::DataProcessing(format!(
"compute_derived_quality: assemble output frame: {e}"
))
})
}
#[cfg(test)]
mod tests {
use super::*;
const SUBJECT: &str = "subject";
fn domain_df(subjects: &[&str], parishes: &[Option<&str>]) -> DataFrame {
DataFrame::new(vec![
Column::new(SUBJECT.into(), subjects),
Column::new("parish".into(), parishes),
])
.unwrap()
}
#[test]
fn maps_domain_values_through_codomain_mapping() {
let df = domain_df(
&["A", "B", "C", "D"],
&[
Some("Orleans"),
Some("Jefferson"),
Some("Lafourche"),
Some("Terrebonne"),
],
);
let deriv = QualityDerivation::new("region", "parish")
.map("SE Louisiana", vec!["Orleans".into(), "Jefferson".into()])
.map("Bayou", vec!["Lafourche".into(), "Terrebonne".into()]);
let out = compute_derived_quality(&df, &deriv, SUBJECT).unwrap();
assert_eq!(out.height(), 4);
let region = out.column("region").unwrap().str().unwrap().clone();
assert_eq!(region.get(0), Some("SE Louisiana"));
assert_eq!(region.get(1), Some("SE Louisiana"));
assert_eq!(region.get(2), Some("Bayou"));
assert_eq!(region.get(3), Some("Bayou"));
}
#[test]
fn unmapped_default_label_is_other() {
let df = domain_df(&["A", "B"], &[Some("Orleans"), Some("Plaquemines")]);
let deriv =
QualityDerivation::new("region", "parish").map("SE Louisiana", vec!["Orleans".into()]);
let out = compute_derived_quality(&df, &deriv, SUBJECT).unwrap();
let region = out.column("region").unwrap().str().unwrap().clone();
assert_eq!(region.get(0), Some("SE Louisiana"));
assert_eq!(region.get(1), Some("other"));
}
#[test]
fn unmapped_custom_label() {
let df = domain_df(&["A", "B"], &[Some("Orleans"), Some("Plaquemines")]);
let deriv = QualityDerivation::new("region", "parish")
.map("SE Louisiana", vec!["Orleans".into()])
.with_unmapped_label("Outside study area");
let out = compute_derived_quality(&df, &deriv, SUBJECT).unwrap();
let region = out.column("region").unwrap().str().unwrap().clone();
assert_eq!(region.get(1), Some("Outside study area"));
}
#[test]
fn unmapped_null_policy() {
let df = domain_df(&["A", "B"], &[Some("Orleans"), Some("Plaquemines")]);
let deriv = QualityDerivation::new("region", "parish")
.map("SE Louisiana", vec!["Orleans".into()])
.with_unmapped_policy(UnmappedPolicy::Null);
let out = compute_derived_quality(&df, &deriv, SUBJECT).unwrap();
let region = out.column("region").unwrap().str().unwrap().clone();
assert_eq!(region.get(0), Some("SE Louisiana"));
assert_eq!(
region.get(1),
None,
"Plaquemines is unmapped, null policy → null"
);
}
#[test]
fn unmapped_error_policy_raises_with_value_list() {
let df = domain_df(
&["A", "B", "C"],
&[Some("Orleans"), Some("Plaquemines"), Some("St. Tammany")],
);
let deriv = QualityDerivation::new("region", "parish")
.map("SE Louisiana", vec!["Orleans".into()])
.with_unmapped_policy(UnmappedPolicy::Error);
let err = compute_derived_quality(&df, &deriv, SUBJECT).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("Plaquemines"),
"msg must list unmapped values: {msg}"
);
assert!(
msg.contains("St. Tammany"),
"msg must list all unmapped values: {msg}"
);
}
#[test]
fn null_domain_value_follows_unmapped_policy() {
let df = domain_df(&["A", "B"], &[Some("Orleans"), None]);
let deriv =
QualityDerivation::new("region", "parish").map("SE Louisiana", vec!["Orleans".into()]);
let out = compute_derived_quality(&df, &deriv, SUBJECT).unwrap();
let region = out.column("region").unwrap().str().unwrap().clone();
assert_eq!(region.get(0), Some("SE Louisiana"));
assert_eq!(region.get(1), Some("other"));
}
#[test]
fn conflicting_domain_to_two_codomains_is_an_error() {
let df = domain_df(&["A"], &[Some("Orleans")]);
let deriv = QualityDerivation::new("region", "parish")
.map("SE Louisiana", vec!["Orleans".into()])
.map("Metro", vec!["Orleans".into()]);
let err = compute_derived_quality(&df, &deriv, SUBJECT).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("must partition"), "msg = {msg}");
}
#[test]
fn missing_domain_column_errors() {
let df = DataFrame::new(vec![Column::new(SUBJECT.into(), &["A", "B"])]).unwrap();
let deriv = QualityDerivation::new("region", "parish").map("X", vec!["Y".into()]);
let err = compute_derived_quality(&df, &deriv, SUBJECT).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("parish"), "msg = {msg}");
}
#[test]
fn to_quality_unit_carries_null_extension_from_label_policy() {
let deriv = QualityDerivation::new("region", "parish")
.map("SE Louisiana", vec!["Orleans".into()])
.with_unmapped_label("Outside");
let unit = deriv.to_quality_unit(&CanonicalColumnName::new("station"));
assert_eq!(unit.name.as_str(), "region");
assert_eq!(unit.subject.as_str(), "station");
assert!(unit.null_value_extension.is_some());
}
#[test]
fn declared_domain_values_is_deduplicated_and_sorted() {
let deriv = QualityDerivation::new("region", "parish")
.map("A", vec!["Orleans".into(), "Jefferson".into()])
.map("B", vec!["Orleans".into(), "Lafourche".into()]);
let declared = deriv.declared_domain_values();
assert_eq!(declared, vec!["Jefferson", "Lafourche", "Orleans"]);
}
}