pub mod phases;
pub mod plan;
pub use phases::{
Complete, Crushed, Drift, DriftSeverity, Expanded, Filtered, Joined, PhaseDiag,
ProcessingPhase, RawData, SignalApplied,
};
pub use plan::{ComponentStrategy, JoinStrategy, MeasurementPlan};
use crate::error::EtlResult;
use polars::prelude::DataFrame;
pub fn execute_measurement(
plan: &MeasurementPlan,
grid: DataFrame,
time_bounds: (i64, i64),
subjects: Option<&[String]>,
) -> EtlResult<(DataFrame, Vec<PhaseDiag>)> {
let mut diags = Vec::new();
let raw = RawData::new(plan)?;
let (filtered, filter_diag) =
raw.filter(&plan.time_col, &plan.subject_col, time_bounds, subjects)?;
diags.push(filter_diag);
let (signal_applied, signal_diag) = filtered.apply_signal_policy(plan)?;
diags.push(signal_diag);
let (joined, join_diag) = match &plan.component_strategy {
ComponentStrategy::Rollup {
filter,
aggregation,
} => {
let (crushed, crush_diag) =
signal_applied.crush(*aggregation, filter.as_deref(), plan)?;
diags.push(crush_diag);
crushed.join_onto(grid, plan)?
}
ComponentStrategy::Series { filter } => {
let (expanded, expand_diag) = signal_applied.expand(filter.as_deref(), plan)?;
diags.push(expand_diag);
expanded.join_onto(grid, plan)?
}
ComponentStrategy::None => {
let (crushed, skip_diag) = signal_applied.skip_components()?;
diags.push(skip_diag);
crushed.join_onto(grid, plan)?
}
};
diags.push(join_diag);
let (complete, fill_diag) = joined.fill_nulls(plan)?;
diags.push(fill_diag);
Ok((complete.into_dataframe(), diags))
}