use std::sync::Arc;
use std::time::Duration;
use polars::prelude::DataFrame;
use crate::aggregation::Aggregate;
use crate::column::CanonicalColumnName;
use crate::error::{EtlError, EtlResult};
use crate::request::EtlUnitSubsetRequest;
use crate::signal_policy::SignalPolicy;
use crate::source::SignalPolicyMode;
use crate::unit::{MeasurementKind, MeasurementUnit, NullValue};
use crate::universe::Universe;
#[derive(Debug, Clone)]
pub enum ComponentStrategy {
Series { filter: Option<Vec<String>> },
Rollup {
filter: Option<Vec<String>>,
aggregation: Aggregate,
},
None,
}
#[derive(Debug, Clone)]
pub enum JoinStrategy {
Equi,
Asof { tolerance_ms: i64 },
}
#[derive(Debug, Clone)]
pub struct MeasurementPlan {
pub name: CanonicalColumnName,
pub kind: MeasurementKind,
pub subject_col: String,
pub time_col: String,
pub component_cols: Vec<CanonicalColumnName>,
pub signal_policy: Option<SignalPolicy>,
pub unit: MeasurementUnit,
pub component_strategy: ComponentStrategy,
pub join_strategy: JoinStrategy,
pub null_value: Option<NullValue>,
pub null_value_extension: Option<NullValue>,
pub aligned_data: Option<Arc<DataFrame>>,
pub raw_data: Arc<DataFrame>,
}
pub fn build_measurement_plans(
universe: &Universe,
request: &EtlUnitSubsetRequest,
mode: SignalPolicyMode,
interval: Duration,
exclude: &std::collections::HashSet<CanonicalColumnName>,
) -> EtlResult<Vec<MeasurementPlan>> {
let schema = &universe.schema;
let subject_col = schema.subject.as_str().to_string();
let time_col = schema.time.as_str().to_string();
let measurement_names: Vec<&CanonicalColumnName> = if request.measurements.is_empty() {
universe.measurements.keys().collect()
} else {
request.measurements.iter().collect()
};
let mut plans = Vec::new();
for name in measurement_names {
if schema.get_derivation(name).is_some() {
continue;
}
if exclude.contains(name) {
continue;
}
let md = universe
.measurements
.get(name)
.ok_or_else(|| EtlError::UnitNotFound(format!("Measurement '{}' not found", name)))?;
let component_strategy = if md.has_components() {
ComponentStrategy::Rollup {
filter: None,
aggregation: md.unit.signal_aggregation(),
}
} else {
ComponentStrategy::None
};
let join_strategy = if mode == SignalPolicyMode::Apply && md.is_aligned() {
JoinStrategy::Equi
} else {
let ttl_ms = md.ttl().as_millis() as i64;
let interval_ms = interval.as_millis() as i64;
if ttl_ms > interval_ms {
JoinStrategy::Asof {
tolerance_ms: ttl_ms,
}
} else {
JoinStrategy::Equi
}
};
let aligned_data = if mode == SignalPolicyMode::Apply {
md.aligned().map(|df| Arc::new(df.clone()))
} else {
None
};
let raw_data = Arc::new(md.fragment().as_dataframe().map_err(EtlError::Polars)?);
plans.push(MeasurementPlan {
name: name.clone(),
kind: md.unit.kind,
subject_col: subject_col.clone(),
time_col: time_col.clone(),
component_cols: md.unit.components.clone(),
signal_policy: md.unit.signal_policy.clone(),
unit: md.unit.clone(),
component_strategy,
join_strategy,
null_value: md.unit.null_value.clone(),
null_value_extension: md.unit.null_value_extension.clone(),
aligned_data,
raw_data,
});
}
Ok(plans)
}