use std::collections::HashMap;
use polars::prelude::*;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use crate::{
MeasurementKind,
aggregation::Aggregate,
column::CanonicalColumnName,
error::{EtlError, EtlResult},
polars_fns::SignalPolicyStats,
universe::measurement_storage::FragmentRef,
};
#[derive(Debug, Clone)]
pub enum EtlUnitFragment {
Measurement(MeasurementFragment),
Quality(QualityFragment),
}
impl EtlUnitFragment {
pub fn name(&self) -> &CanonicalColumnName {
match self {
EtlUnitFragment::Measurement(m) => &m.unit_name,
EtlUnitFragment::Quality(q) => &q.unit_name,
}
}
pub fn source_name(&self) -> &str {
match self {
EtlUnitFragment::Measurement(m) => &m.source_name,
EtlUnitFragment::Quality(q) => &q.source_name,
}
}
pub fn height(&self) -> usize {
match self {
EtlUnitFragment::Measurement(m) => m.fragment.height(),
EtlUnitFragment::Quality(q) => q.data.height(),
}
}
pub fn is_measurement(&self) -> bool {
matches!(self, EtlUnitFragment::Measurement(_))
}
pub fn is_quality(&self) -> bool {
matches!(self, EtlUnitFragment::Quality(_))
}
pub fn as_measurement(&self) -> Option<&MeasurementFragment> {
match self {
EtlUnitFragment::Measurement(m) => Some(m),
_ => None,
}
}
pub fn as_quality(&self) -> Option<&QualityFragment> {
match self {
EtlUnitFragment::Quality(q) => Some(q),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct MeasurementFragment {
pub unit_name: CanonicalColumnName,
pub source_name: String,
pub kind: MeasurementKind,
pub components: Vec<CanonicalColumnName>,
pub fragment: FragmentRef,
pub signal_policy_stats: Option<SignalPolicyStats>,
}
impl MeasurementFragment {
pub fn new(
unit_name: impl Into<CanonicalColumnName>,
source_name: impl Into<String>,
kind: MeasurementKind,
components: Vec<CanonicalColumnName>,
data: DataFrame,
) -> Self {
Self {
unit_name: unit_name.into(),
source_name: source_name.into(),
kind,
components,
fragment: FragmentRef::Materialized(data),
signal_policy_stats: None,
}
}
pub fn with_ref(
unit_name: impl Into<CanonicalColumnName>,
source_name: impl Into<String>,
kind: MeasurementKind,
components: Vec<CanonicalColumnName>,
fragment: FragmentRef,
) -> Self {
Self {
unit_name: unit_name.into(),
source_name: source_name.into(),
kind,
components,
fragment,
signal_policy_stats: None,
}
}
pub fn materialize(&self) -> EtlResult<DataFrame> {
self.fragment.as_dataframe().map_err(Into::into)
}
pub fn with_signal_policy_stats(mut self, stats: Option<SignalPolicyStats>) -> Self {
self.signal_policy_stats = stats;
self
}
pub fn has_component(&self, component: &CanonicalColumnName) -> bool {
self.components.contains(component)
}
pub fn columns(&self) -> Vec<String> {
match &self.fragment {
FragmentRef::Materialized(df) => df
.get_column_names()
.iter()
.map(|s| s.to_string())
.collect(),
_ => {
match self.materialize() {
Ok(df) => df
.get_column_names()
.iter()
.map(|s| s.to_string())
.collect(),
Err(_) => Vec::new(),
}
}
}
}
pub fn validate(
&self,
subject_col: &CanonicalColumnName,
time_col: &CanonicalColumnName,
) -> EtlResult<()> {
let cols = self.columns();
if !cols.iter().any(|c| c == subject_col.as_str()) {
return Err(EtlError::MissingColumn(format!(
"MeasurementFragment '{}' from '{}' missing subject column '{}'",
self.unit_name, self.source_name, subject_col
)));
}
if !cols.iter().any(|c| c == time_col.as_str()) {
return Err(EtlError::MissingColumn(format!(
"MeasurementFragment '{}' from '{}' missing time column '{}'",
self.unit_name, self.source_name, time_col
)));
}
if !cols.iter().any(|c| c == self.unit_name.as_str()) {
return Err(EtlError::MissingColumn(format!(
"MeasurementFragment '{}' from '{}' missing value column",
self.unit_name, self.source_name
)));
}
for comp in &self.components {
if !cols.iter().any(|c| c == comp.as_str()) {
return Err(EtlError::MissingColumn(format!(
"MeasurementFragment '{}' from '{}' missing component column '{}'",
self.unit_name, self.source_name, comp
)));
}
}
Ok(())
}
}
impl From<MeasurementFragment> for EtlUnitFragment {
fn from(m: MeasurementFragment) -> Self {
EtlUnitFragment::Measurement(m)
}
}
#[derive(Debug, Clone)]
pub struct QualityFragment {
pub unit_name: CanonicalColumnName,
pub source_name: String,
pub data: DataFrame,
}
impl QualityFragment {
pub fn new(
unit_name: impl Into<CanonicalColumnName>,
source_name: impl Into<String>,
data: DataFrame,
) -> Self {
Self {
unit_name: unit_name.into(),
source_name: source_name.into(),
data,
}
}
pub fn columns(&self) -> Vec<String> {
self.data
.get_column_names()
.iter()
.map(|s| s.to_string())
.collect()
}
pub fn validate(&self, subject_col: &CanonicalColumnName) -> EtlResult<()> {
let cols = self.columns();
if !cols.iter().any(|c| c == subject_col.as_str()) {
return Err(EtlError::MissingColumn(format!(
"QualityFragment '{}' from '{}' missing subject column '{}'",
self.unit_name, self.source_name, subject_col
)));
}
if !cols.iter().any(|c| c == self.unit_name.as_str()) {
return Err(EtlError::MissingColumn(format!(
"QualityFragment '{}' from '{}' missing value column",
self.unit_name, self.source_name
)));
}
Ok(())
}
}
impl From<QualityFragment> for EtlUnitFragment {
fn from(q: QualityFragment) -> Self {
EtlUnitFragment::Quality(q)
}
}
#[derive(Debug, Default)]
pub struct FragmentAccumulator {
pub measurements: HashMap<CanonicalColumnName, Vec<MeasurementFragment>>,
pub qualities: HashMap<CanonicalColumnName, Vec<QualityFragment>>,
}
impl FragmentAccumulator {
pub fn new() -> Self {
Self::default()
}
pub fn add(&mut self, fragment: EtlUnitFragment) {
match fragment {
EtlUnitFragment::Measurement(m) => self.add_measurement(m),
EtlUnitFragment::Quality(q) => self.add_quality(q),
}
}
pub fn add_measurement(&mut self, fragment: MeasurementFragment) {
self.measurements
.entry(fragment.unit_name.clone())
.or_default()
.push(fragment);
}
pub fn add_quality(&mut self, fragment: QualityFragment) {
self.qualities
.entry(fragment.unit_name.clone())
.or_default()
.push(fragment);
}
pub fn add_all(&mut self, fragments: impl IntoIterator<Item = EtlUnitFragment>) {
for fragment in fragments {
self.add(fragment);
}
}
pub fn measurement_names(&self) -> impl Iterator<Item = &CanonicalColumnName> {
self.measurements.keys()
}
pub fn quality_names(&self) -> impl Iterator<Item = &CanonicalColumnName> {
self.qualities.keys()
}
pub fn get_measurement(&self, name: &CanonicalColumnName) -> Option<&Vec<MeasurementFragment>> {
self.measurements.get(name)
}
pub fn get_quality(&self, name: &CanonicalColumnName) -> Option<&Vec<QualityFragment>> {
self.qualities.get(name)
}
pub fn measurement_count(&self) -> usize {
self.measurements.len()
}
pub fn quality_count(&self) -> usize {
self.qualities.len()
}
pub fn total_fragments(&self) -> usize {
self.measurements.values().map(|v| v.len()).sum::<usize>()
+ self.qualities.values().map(|v| v.len()).sum::<usize>()
}
pub fn is_empty(&self) -> bool {
self.measurements.is_empty() && self.qualities.is_empty()
}
pub fn into_parts(
self,
) -> (
HashMap<CanonicalColumnName, Vec<MeasurementFragment>>,
HashMap<CanonicalColumnName, Vec<QualityFragment>>,
) {
(self.measurements, self.qualities)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrushedComponent {
pub measurement_name: CanonicalColumnName,
pub component_name: CanonicalColumnName,
pub input_units_with: Vec<String>,
pub input_units_without: Vec<String>,
pub aggregation: Aggregate,
}
#[derive(Debug, Clone)]
pub struct ComposedMeasurement {
pub name: CanonicalColumnName,
pub kind: MeasurementKind,
pub components: Vec<CanonicalColumnName>,
pub sources: Vec<String>,
pub crushed_components: Vec<CrushedComponent>,
pub fragment: FragmentRef,
pub signal_policy_stats: Vec<SignalPolicyStats>,
}
impl ComposedMeasurement {
pub fn from_fragments(
fragments: Vec<MeasurementFragment>,
subject_col: &CanonicalColumnName,
time_col: &CanonicalColumnName,
) -> EtlResult<Self> {
if fragments.is_empty() {
return Err(EtlError::Config(
"Cannot create ComposedMeasurement from empty fragments".into(),
));
}
let unit_name = fragments[0].unit_name.clone();
let kind = fragments[0].kind;
for frag in &fragments {
if frag.unit_name != unit_name {
return Err(EtlError::Config(format!(
"Fragment unit mismatch: expected '{}', got '{}'",
unit_name, frag.unit_name
)));
}
frag.validate(subject_col, time_col)?;
}
let sources: Vec<String> = fragments.iter().map(|f| f.source_name.clone()).collect();
let signal_policy_stats: Vec<SignalPolicyStats> = fragments
.iter()
.filter_map(|f| f.signal_policy_stats.clone())
.collect();
if fragments.len() == 1 {
let frag = fragments.into_iter().next().unwrap();
debug!(
unit = %unit_name,
source = ?sources,
storage = ?frag.fragment.storage_description().kind,
"Single fragment — passing through (no stacking)"
);
return Ok(Self {
name: unit_name,
kind,
components: frag.components,
sources,
crushed_components: Vec::new(),
fragment: frag.fragment,
signal_policy_stats,
});
}
let (final_components, crushed_components, processed_dfs) =
Self::analyze_components(&fragments, kind)?;
debug!(
unit = %unit_name,
sources = ?sources,
components = final_components.len(),
crushed = crushed_components.len(),
signal_policy_stats = signal_policy_stats.len(),
"Stacking measurement fragments"
);
let stacked = Self::stack_dataframes(
processed_dfs,
subject_col,
time_col,
&final_components,
&unit_name,
)?;
Ok(Self {
name: unit_name,
kind,
components: final_components,
sources,
crushed_components,
fragment: FragmentRef::Materialized(stacked),
signal_policy_stats,
})
}
fn analyze_components(
fragments: &[MeasurementFragment],
kind: MeasurementKind,
) -> EtlResult<(
Vec<CanonicalColumnName>,
Vec<CrushedComponent>,
Vec<DataFrame>,
)> {
use std::collections::HashSet;
if fragments.len() == 1 {
return Ok((
fragments[0].components.clone(),
Vec::new(),
vec![fragments[0].materialize()?],
));
}
let mut component_sources: HashMap<CanonicalColumnName, HashSet<String>> = HashMap::new();
for frag in fragments {
for comp in &frag.components {
component_sources
.entry(comp.clone())
.or_default()
.insert(frag.source_name.clone());
}
}
let all_sources: HashSet<String> =
fragments.iter().map(|f| f.source_name.clone()).collect();
let mut final_components = Vec::new();
let mut crushed = Vec::new();
for (comp, sources_with) in &component_sources {
if sources_with.len() == fragments.len() {
final_components.push(comp.clone());
} else {
let sources_without: Vec<String> = all_sources
.iter()
.filter(|s| !sources_with.contains(*s))
.cloned()
.collect();
let aggregation = kind.default_aggregation();
warn!(
component = %comp,
sources_with = ?sources_with.iter().collect::<Vec<_>>(),
sources_without = ?sources_without,
aggregation = ?aggregation,
"Crushing component"
);
crushed.push(CrushedComponent {
measurement_name: fragments[0].unit_name.clone(),
component_name: comp.clone(),
input_units_with: sources_with.iter().cloned().collect(),
input_units_without: sources_without,
aggregation,
});
}
}
final_components.sort_by(|a, b| a.as_str().cmp(b.as_str()));
let processed: EtlResult<Vec<DataFrame>> = fragments
.iter()
.map(|frag| {
let needs_crushing = frag
.components
.iter()
.any(|c| crushed.iter().any(|cc| &cc.component_name == c));
if needs_crushing {
Self::crush_fragment(frag, &final_components, kind)
} else {
frag.materialize()
}
})
.collect();
Ok((final_components, crushed, processed?))
}
fn crush_fragment(
frag: &MeasurementFragment,
keep_components: &[CanonicalColumnName],
kind: MeasurementKind,
) -> EtlResult<DataFrame> {
let data = frag.materialize()?;
let col_names: Vec<String> = data
.get_column_names()
.iter()
.map(|s| s.to_string())
.collect();
let subject_col = &col_names[0];
let time_col = &col_names[1];
let mut group_cols: Vec<Expr> = vec![col(subject_col), col(time_col)];
for comp in keep_components {
if col_names.contains(&comp.as_str().to_string()) {
group_cols.push(col(comp.as_str()));
}
}
let agg = kind.default_aggregation();
if matches!(agg, Aggregate::MostRecent | Aggregate::LeastRecent) {
let crushed_comp = frag
.components
.iter()
.find(|c| !keep_components.contains(c));
if let Some(comp_col) = crushed_comp {
let descending = matches!(agg, Aggregate::MostRecent);
let mut sort_cols = group_cols.iter().map(|e| e.clone()).collect::<Vec<_>>();
sort_cols.push(col(comp_col.as_str()));
let sort_descending: Vec<bool> = sort_cols
.iter()
.enumerate()
.map(|(i, _)| i == sort_cols.len() - 1 && descending)
.collect();
let value_agg = col(frag.unit_name.as_str())
.first()
.alias(frag.unit_name.as_str());
return data
.clone()
.lazy()
.sort_by_exprs(
sort_cols,
SortMultipleOptions::new().with_order_descending_multi(sort_descending),
)
.group_by(group_cols)
.agg([value_agg])
.collect()
.map_err(Into::into);
}
tracing::warn!(
measurement = frag.unit_name.as_str(),
"MostRecent/LeastRecent: no component to sort by, falling back to mean"
);
}
let agg_expr = match agg {
Aggregate::Mean => col(frag.unit_name.as_str()).mean(),
Aggregate::Sum => col(frag.unit_name.as_str()).sum(),
Aggregate::Min => col(frag.unit_name.as_str()).min(),
Aggregate::Max => col(frag.unit_name.as_str()).max(),
Aggregate::Last => col(frag.unit_name.as_str()).last(),
Aggregate::First => col(frag.unit_name.as_str()).first(),
Aggregate::Count => col(frag.unit_name.as_str()).count(),
_ => col(frag.unit_name.as_str()).mean(),
}
.alias(frag.unit_name.as_str());
data.clone()
.lazy()
.group_by(group_cols)
.agg([agg_expr])
.collect()
.map_err(Into::into)
}
fn stack_dataframes(
dfs: Vec<DataFrame>,
subject_col: &CanonicalColumnName,
time_col: &CanonicalColumnName,
components: &[CanonicalColumnName],
value_col: &CanonicalColumnName,
) -> EtlResult<DataFrame> {
let mut select_cols: Vec<Expr> = vec![col(subject_col.as_str()), col(time_col.as_str())];
for comp in components {
select_cols.push(col(comp.as_str()));
}
select_cols.push(col(value_col.as_str()));
let normalized: Vec<LazyFrame> = dfs
.into_iter()
.map(|df| df.lazy().select(select_cols.clone()))
.collect();
concat(normalized, UnionArgs::default())?
.collect()
.map_err(Into::into)
}
pub fn was_crushed(&self, component: &CanonicalColumnName) -> bool {
self.crushed_components
.iter()
.any(|c| &c.component_name == component)
}
pub fn height(&self) -> usize {
self.fragment.height()
}
}
#[derive(Debug, Clone)]
pub struct ComposedQuality {
pub name: CanonicalColumnName,
pub sources: Vec<String>,
pub data: DataFrame,
}
impl ComposedQuality {
pub fn from_fragments(
fragments: Vec<QualityFragment>,
subject_col: &CanonicalColumnName,
) -> EtlResult<Self> {
if fragments.is_empty() {
return Err(EtlError::Config(
"Cannot create ComposedQuality from empty fragments".into(),
));
}
let unit_name = fragments[0].unit_name.clone();
for frag in &fragments {
if frag.unit_name != unit_name {
return Err(EtlError::Config(format!(
"Fragment unit mismatch: expected '{}', got '{}'",
unit_name, frag.unit_name
)));
}
frag.validate(subject_col)?;
}
let sources: Vec<String> = fragments.iter().map(|f| f.source_name.clone()).collect();
debug!(
unit = %unit_name,
sources = ?sources,
"Stacking quality fragments"
);
let dfs: Vec<LazyFrame> = fragments
.into_iter()
.map(|f| {
f.data
.lazy()
.select([col(subject_col.as_str()), col(unit_name.as_str())])
})
.collect();
let stacked = concat(dfs, UnionArgs::default())?
.unique(Some(subject_col.into()), UniqueKeepStrategy::First)
.collect()?;
Ok(Self {
name: unit_name,
sources,
data: stacked,
})
}
pub fn height(&self) -> usize {
self.data.height()
}
}
pub fn stack_all_fragments(
accumulator: FragmentAccumulator,
subject_col: &CanonicalColumnName,
time_col: &CanonicalColumnName,
) -> EtlResult<(Vec<ComposedMeasurement>, Vec<ComposedQuality>)> {
let (measurements, qualities) = accumulator.into_parts();
let composed_measurements: EtlResult<Vec<ComposedMeasurement>> = measurements
.into_values()
.map(|frags| ComposedMeasurement::from_fragments(frags, subject_col, time_col))
.collect();
let composed_qualities: EtlResult<Vec<ComposedQuality>> = qualities
.into_values()
.map(|frags| ComposedQuality::from_fragments(frags, subject_col))
.collect();
Ok((composed_measurements?, composed_qualities?))
}
#[cfg(test)]
mod tests {
use super::*;
fn make_measurement_fragment(
unit_name: &str,
source_name: &str,
components: Vec<&str>,
) -> MeasurementFragment {
let mut df = df! {
"subject" => ["A", "B"],
"time" => [100i64, 200],
unit_name => [1.0, 2.0]
}
.unwrap();
for comp in &components {
df = df
.lazy()
.with_column(lit("val").alias(*comp))
.collect()
.unwrap();
}
MeasurementFragment::new(
unit_name,
source_name,
MeasurementKind::Measure,
components
.into_iter()
.map(CanonicalColumnName::from)
.collect(),
df,
)
}
fn make_quality_fragment(unit_name: &str, source_name: &str) -> QualityFragment {
QualityFragment::new(
unit_name,
source_name,
df! {
"subject" => ["A", "B"],
unit_name => ["val_a", "val_b"]
}
.unwrap(),
)
}
#[test]
fn test_measurement_fragment_validation() {
let frag = make_measurement_fragment("temp", "source_a", vec!["sensor"]);
assert!(frag.validate(&"subject".into(), &"time".into()).is_ok());
assert!(frag.validate(&"wrong".into(), &"time".into()).is_err());
}
#[test]
fn test_quality_fragment_validation() {
let frag = make_quality_fragment("name", "source_a");
assert!(frag.validate(&"subject".into()).is_ok());
assert!(frag.validate(&"wrong".into()).is_err());
}
#[test]
fn test_accumulator_separates_types() {
let mut acc = FragmentAccumulator::new();
acc.add(make_measurement_fragment("temp", "source_a", vec![]).into());
acc.add(make_quality_fragment("name", "source_a").into());
assert_eq!(acc.measurement_count(), 1);
assert_eq!(acc.quality_count(), 1);
assert_eq!(acc.total_fragments(), 2);
}
#[test]
fn test_composed_measurement_single_fragment() {
let frag = make_measurement_fragment("temp", "source_a", vec!["sensor"]);
let composed =
ComposedMeasurement::from_fragments(vec![frag], &"subject".into(), &"time".into())
.unwrap();
assert_eq!(composed.name.as_str(), "temp");
assert_eq!(composed.sources, vec!["source_a"]);
assert_eq!(composed.components.len(), 1);
assert!(composed.crushed_components.is_empty());
assert!(composed.signal_policy_stats.is_empty());
}
#[test]
fn test_composed_measurement_stacks_compatible() {
let frag_a = make_measurement_fragment("temp", "source_a", vec!["sensor"]);
let frag_b = make_measurement_fragment("temp", "source_b", vec!["sensor"]);
let composed = ComposedMeasurement::from_fragments(
vec![frag_a, frag_b],
&"subject".into(),
&"time".into(),
)
.unwrap();
assert_eq!(composed.sources.len(), 2);
assert_eq!(composed.height(), 4); assert!(composed.crushed_components.is_empty());
}
#[test]
fn test_composed_measurement_crushes_incompatible() {
let frag_a = make_measurement_fragment("temp", "source_a", vec!["color"]);
let frag_b = make_measurement_fragment("temp", "source_b", vec![]);
let composed = ComposedMeasurement::from_fragments(
vec![frag_a, frag_b],
&"subject".into(),
&"time".into(),
)
.unwrap();
assert_eq!(composed.crushed_components.len(), 1);
assert_eq!(
composed.crushed_components[0].component_name.as_str(),
"color"
);
assert!(composed.components.is_empty());
}
#[test]
fn test_composed_quality_deduplicates() {
let frag_a = QualityFragment::new(
"name",
"source_a",
df! { "subject" => ["A", "B"], "name" => ["First A", "First B"] }.unwrap(),
);
let frag_b = QualityFragment::new(
"name",
"source_b",
df! { "subject" => ["A", "C"], "name" => ["Second A", "First C"] }.unwrap(),
);
let composed =
ComposedQuality::from_fragments(vec![frag_a, frag_b], &"subject".into()).unwrap();
assert_eq!(composed.height(), 3);
}
#[test]
fn test_stack_all_fragments() {
let mut acc = FragmentAccumulator::new();
acc.add_measurement(make_measurement_fragment("temp", "source_a", vec![]));
acc.add_measurement(make_measurement_fragment("temp", "source_b", vec![]));
acc.add_quality(make_quality_fragment("name", "source_a"));
let (measurements, qualities) =
stack_all_fragments(acc, &"subject".into(), &"time".into()).unwrap();
assert_eq!(measurements.len(), 1);
assert_eq!(qualities.len(), 1);
assert_eq!(measurements[0].height(), 4); assert_eq!(qualities[0].height(), 2); }
}