use std::collections::HashMap;
use polars::prelude::*;
use serde::{Deserialize, Serialize};
use tracing::{debug, instrument};
use crate::{
MeasurementKind,
column::{CanonicalColumnName, SourceColumnName},
error::{EtlError, EtlResult},
universe::MeasurementFragment,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UnpivotConfig {
pub output: UnpivotOutput,
pub inputs: Vec<UnpivotInput>,
pub subject: ColumnMapping,
pub time: ColumnMapping,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnMapping {
pub source: SourceColumnName,
pub canonical: CanonicalColumnName,
}
impl UnpivotConfig {
pub(crate) fn name(&self) -> &CanonicalColumnName {
&self.output.measurement
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UnpivotOutput {
pub measurement: CanonicalColumnName,
pub kind: MeasurementKind,
pub components: Vec<CanonicalColumnName>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UnpivotInput {
pub source_column: SourceColumnName,
pub tags: HashMap<CanonicalColumnName, String>,
}
impl UnpivotConfig {
pub fn creates(
measurement: impl Into<CanonicalColumnName>,
kind: MeasurementKind,
) -> UnpivotBuilder {
UnpivotBuilder {
measurement: measurement.into(),
kind,
components: Vec::new(),
inputs: Vec::new(),
subject: None,
time: None,
}
}
}
#[derive(Debug, Clone)]
pub struct UnpivotBuilder {
measurement: CanonicalColumnName,
kind: MeasurementKind,
components: Vec<CanonicalColumnName>,
inputs: Vec<UnpivotInput>,
subject: Option<ColumnMapping>,
time: Option<ColumnMapping>,
}
impl UnpivotBuilder {
pub fn subject(
mut self,
source: impl Into<SourceColumnName>,
canonical: impl Into<CanonicalColumnName>,
) -> Self {
self.subject = Some(ColumnMapping {
source: source.into(),
canonical: canonical.into(),
});
self
}
pub fn time(
mut self,
source: impl Into<SourceColumnName>,
canonical: impl Into<CanonicalColumnName>,
) -> Self {
self.time = Some(ColumnMapping {
source: source.into(),
canonical: canonical.into(),
});
self
}
pub fn component(mut self, name: impl Into<CanonicalColumnName>) -> Self {
self.components.push(name.into());
self
}
pub fn from_source<S, I, K, V>(mut self, source_column: S, tags: I) -> Self
where
S: Into<SourceColumnName>,
I: IntoIterator<Item = (K, V)>,
K: Into<CanonicalColumnName>,
V: Into<String>,
{
let tags_map: HashMap<CanonicalColumnName, String> = tags
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect();
self.inputs.push(UnpivotInput {
source_column: source_column.into(),
tags: tags_map,
});
self
}
pub fn from_sources<I, S, T, K, V>(mut self, sources: I) -> Self
where
I: IntoIterator<Item = (S, T)>,
S: Into<SourceColumnName>,
T: IntoIterator<Item = (K, V)>,
K: Into<CanonicalColumnName>,
V: Into<String>,
{
for (source_column, tags) in sources {
let tags_map: HashMap<CanonicalColumnName, String> = tags
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect();
self.inputs.push(UnpivotInput {
source_column: source_column.into(),
tags: tags_map,
});
}
self
}
pub fn build(self) -> UnpivotConfig {
self.build_checked()
.expect("UnpivotConfig validation failed")
}
pub fn build_checked(self) -> EtlResult<UnpivotConfig> {
let subject = self.subject.ok_or_else(|| {
EtlError::Config(format!(
"Unpivot '{}': subject column mapping not specified. Use .subject(source, canonical)",
self.measurement
))
})?;
let time = self.time.ok_or_else(|| {
EtlError::Config(format!(
"Unpivot '{}': time column mapping not specified. Use .time(source, canonical)",
self.measurement
))
})?;
let config = UnpivotConfig {
output: UnpivotOutput {
measurement: self.measurement,
kind: self.kind,
components: self.components,
},
inputs: self.inputs,
subject,
time,
};
config.validate()?;
Ok(config)
}
}
impl From<UnpivotBuilder> for UnpivotConfig {
fn from(builder: UnpivotBuilder) -> Self {
builder.build()
}
}
impl UnpivotConfig {
pub fn measurement_name(&self) -> &CanonicalColumnName {
&self.output.measurement
}
pub fn kind(&self) -> MeasurementKind {
self.output.kind
}
pub fn source_subject(&self) -> &SourceColumnName {
&self.subject.source
}
pub fn canonical_subject(&self) -> &CanonicalColumnName {
&self.subject.canonical
}
pub fn source_time(&self) -> &SourceColumnName {
&self.time.source
}
pub fn canonical_time(&self) -> &CanonicalColumnName {
&self.time.canonical
}
pub fn source_columns(&self) -> impl Iterator<Item = &SourceColumnName> {
self.inputs.iter().map(|i| &i.source_column)
}
pub fn all_source_columns(&self) -> Vec<&SourceColumnName> {
let mut cols = vec![&self.subject.source, &self.time.source];
cols.extend(self.inputs.iter().map(|i| &i.source_column));
cols
}
pub fn component_columns(&self) -> &[CanonicalColumnName] {
&self.output.components
}
pub fn input_count(&self) -> usize {
self.inputs.len()
}
pub fn tag_value(
&self,
source: &SourceColumnName,
component: &CanonicalColumnName,
) -> Option<&str> {
self.inputs
.iter()
.find(|i| &i.source_column == source)
.and_then(|i| i.tags.get(component))
.map(|s| s.as_str())
}
pub fn validate(&self) -> EtlResult<()> {
if self.inputs.is_empty() {
return Err(EtlError::Config(format!(
"Unpivot '{}' has no source columns",
self.output.measurement
)));
}
for input in &self.inputs {
for component in &self.output.components {
if !input.tags.contains_key(component) {
return Err(EtlError::Config(format!(
"Unpivot '{}': source column '{}' missing tag for component '{}'",
self.output.measurement, input.source_column, component
)));
}
}
for tag_component in input.tags.keys() {
if !self.output.components.contains(tag_component) {
return Err(EtlError::Config(format!(
"Unpivot '{}': source column '{}' has tag '{}' but component not declared",
self.output.measurement, input.source_column, tag_component
)));
}
}
}
Ok(())
}
}
impl UnpivotConfig {
#[instrument(skip(self, source_df), fields(measurement = %self.output.measurement, inputs = self.inputs.len()))]
pub fn execute(
&self,
source_df: &DataFrame,
source_name: &str,
) -> EtlResult<MeasurementFragment> {
self.validate()?;
debug!(
source_columns = ?self.inputs.iter().map(|i| i.source_column.as_str()).collect::<Vec<_>>(),
components = ?self.output.components.iter().map(|c| c.as_str()).collect::<Vec<_>>(),
subject_source = %self.subject.source,
subject_canonical = %self.subject.canonical,
time_source = %self.time.source,
time_canonical = %self.time.canonical,
"Executing unpivot"
);
let mut fragments: Vec<DataFrame> = Vec::with_capacity(self.inputs.len());
for input in &self.inputs {
let mini_df = self.build_input_fragment(source_df, input)?;
fragments.push(mini_df);
}
let stacked = if fragments.len() == 1 {
fragments.remove(0)
} else {
concat(
fragments
.iter()
.map(|df| df.clone().lazy())
.collect::<Vec<_>>(),
UnionArgs::default(),
)?
.collect()?
};
let rows_before = stacked.height();
let stacked_height = stacked.height();
let filtered = stacked
.lazy()
.filter(col(self.output.measurement.as_str()).is_not_null())
.collect()?;
debug!(
rows_before = rows_before,
rows_after = filtered.height(),
rows_removed = stacked_height - filtered.height(),
"🦀 ✅ Unpivot Filtered null measurement values"
);
Ok(MeasurementFragment::new(
self.output.measurement.clone(),
source_name,
self.output.kind,
self.output.components.clone(),
filtered,
))
}
fn build_input_fragment(
&self,
source_df: &DataFrame,
input: &UnpivotInput,
) -> EtlResult<DataFrame> {
let measurement_name = self.output.measurement.as_str();
let source_col_name = input.source_column.as_str();
if source_df.column(source_col_name).is_err() {
return Err(EtlError::MissingColumn(format!(
"Unpivot '{}': source column '{}' not found in DataFrame. Available: {:?}",
measurement_name,
source_col_name,
source_df
.get_column_names()
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
)));
}
if source_df.column(self.subject.source.as_str()).is_err() {
return Err(EtlError::MissingColumn(format!(
"Unpivot '{}': subject column '{}' not found in DataFrame. Available: {:?}",
measurement_name,
self.subject.source,
source_df
.get_column_names()
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
)));
}
if source_df.column(self.time.source.as_str()).is_err() {
return Err(EtlError::MissingColumn(format!(
"Unpivot '{}': time column '{}' not found in DataFrame. Available: {:?}",
measurement_name,
self.time.source,
source_df
.get_column_names()
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
)));
}
let mut select_exprs: Vec<Expr> = Vec::new();
select_exprs.push(col(self.subject.source.as_str()).alias(self.subject.canonical.as_str()));
select_exprs.push(col(self.time.source.as_str()).alias(self.time.canonical.as_str()));
for component in &self.output.components {
let tag_value = input.tags.get(component).ok_or_else(|| {
EtlError::Config(format!(
"Unpivot '{}': missing tag '{}' for source column '{}'",
measurement_name, component, source_col_name
))
})?;
select_exprs.push(lit(tag_value.clone()).alias(component.as_str()));
}
select_exprs.push(col(source_col_name).alias(measurement_name));
let fragment = source_df.clone().lazy().select(select_exprs).collect()?;
Ok(fragment)
}
pub fn execute_to_df(&self, source_df: &DataFrame) -> EtlResult<DataFrame> {
self.execute(source_df, "unknown")?.materialize()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_unpivot_config_builder() {
let config = UnpivotConfig::creates("engine_status", MeasurementKind::Categorical)
.subject("station_name", "station_name")
.time("observation_time", "timestamp")
.component("engine")
.from_sources([
("engine_1", [("engine", "1")]),
("engine_2", [("engine", "2")]),
("engine_3", [("engine", "3")]),
])
.build();
assert_eq!(config.measurement_name().as_str(), "engine_status");
assert_eq!(config.source_subject().as_str(), "station_name");
assert_eq!(config.canonical_subject().as_str(), "station_name");
assert_eq!(config.source_time().as_str(), "observation_time");
assert_eq!(config.canonical_time().as_str(), "timestamp");
assert_eq!(config.input_count(), 3);
assert_eq!(config.component_columns().len(), 1);
}
#[test]
fn test_unpivot_config_missing_subject() {
let result = UnpivotConfig::creates("engine_status", MeasurementKind::Categorical)
.time("observation_time", "timestamp")
.component("engine")
.from_source("engine_1", [("engine", "1")])
.build_checked();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("subject"));
}
#[test]
fn test_unpivot_config_missing_time() {
let result = UnpivotConfig::creates("engine_status", MeasurementKind::Categorical)
.subject("station_name", "station_name")
.component("engine")
.from_source("engine_1", [("engine", "1")])
.build_checked();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("time"));
}
#[test]
fn test_all_source_columns() {
let config = UnpivotConfig::creates("engine_status", MeasurementKind::Categorical)
.subject("station_name", "station_name")
.time("observation_time", "timestamp")
.component("engine")
.from_sources([
("engine_1", [("engine", "1")]),
("engine_2", [("engine", "2")]),
])
.build();
let all_cols = config.all_source_columns();
assert_eq!(all_cols.len(), 4); assert!(all_cols.iter().any(|c| c.as_str() == "station_name"));
assert!(all_cols.iter().any(|c| c.as_str() == "observation_time"));
assert!(all_cols.iter().any(|c| c.as_str() == "engine_1"));
assert!(all_cols.iter().any(|c| c.as_str() == "engine_2"));
}
}