use std::collections::HashMap;
use polars::prelude::*;
use serde::{Deserialize, Serialize};
use super::{
GroupBy, MissingQualityPolicy,
planner::{GroupAggregationPlan, GroupAggregationPlanner},
};
use crate::{
CanonicalColumnName,
aggregation::Aggregate,
error::{EtlError, EtlResult},
subset::{
StageOutcome,
stages::{StageDiag, SubsetStage},
},
unit::MeasurementUnit,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupStats {
pub group_label: String,
pub quality_values: HashMap<CanonicalColumnName, Option<String>>,
pub bucket_start_ms: i64,
pub measurement: CanonicalColumnName,
pub aggregation: Aggregate,
pub n_subjects_contributing: usize,
pub value: Option<f64>,
pub stderr: Option<f64>,
pub min: Option<f64>,
pub max: Option<f64>,
}
#[derive(Debug)]
pub struct GroupAggregateOutput {
pub data: DataFrame,
pub stats: Vec<GroupStats>,
}
pub fn apply_group_by(
data: DataFrame,
group_by: Option<&GroupBy>,
qualities_df: Option<&DataFrame>,
plans: Vec<GroupAggregationPlan>,
subject_col: &str,
time_col: &str,
) -> EtlResult<StageOutcome<GroupStats>> {
let Some(group_by) = group_by else {
return Ok(StageOutcome::passthrough(data));
};
if plans.is_empty() {
return Ok(StageOutcome::passthrough(data));
}
let Some(qualities_df) = qualities_df else {
return Err(EtlError::Config(
"apply_group_by: group_by requested but no qualities DataFrame available".into(),
));
};
let start = std::time::Instant::now();
let out = run_group_by(&data, qualities_df, group_by, &plans, subject_col, time_col)?;
let rows_after = out.data.height();
let n_groups = out
.stats
.iter()
.map(|s| s.group_label.as_str())
.collect::<std::collections::HashSet<_>>()
.len();
let paths_summary = plans
.iter()
.map(|p| format!("{}={:?}", p.measurement.as_str(), p.aggregation))
.collect::<Vec<_>>()
.join(", ");
let diag = StageDiag {
stage: SubsetStage::GroupBy {
qualities: group_by
.qualities
.iter()
.map(|q| q.as_str().to_string())
.collect(),
missing_policy: format!("{:?}", group_by.missing_policy),
n_groups,
n_measurements: plans.len(),
},
rows_after,
elapsed_us: start.elapsed().as_micros() as u64,
notes: vec![
format!("per-measurement aggregation: {paths_summary}"),
format!("stats_rows: {}", out.stats.len()),
],
};
Ok(StageOutcome::executed(out.data, out.stats, diag))
}
pub fn build_group_plans(
units: &[&MeasurementUnit],
group_by: Option<&GroupBy>,
) -> Vec<GroupAggregationPlan> {
let Some(group_by) = group_by else {
return Vec::new();
};
units
.iter()
.map(|u| GroupAggregationPlanner::new(u, group_by).plan())
.collect()
}
pub fn run_group_by(
subset_df: &DataFrame,
qualities_df: &DataFrame,
group_by: &GroupBy,
plans: &[GroupAggregationPlan],
subject_col: &str,
time_col: &str,
) -> EtlResult<GroupAggregateOutput> {
validate_inputs(
subset_df,
qualities_df,
group_by,
plans,
subject_col,
time_col,
)?;
let mut quality_projection: Vec<Expr> = vec![col(subject_col)];
for q in &group_by.qualities {
quality_projection.push(col(q.as_str()));
}
let qualities_projected = qualities_df
.clone()
.lazy()
.select(quality_projection)
.unique_stable(None, UniqueKeepStrategy::First)
.collect()
.map_err(|e| EtlError::DataProcessing(format!("run_group_by: project qualities: {e}")))?;
let joined = subset_df
.clone()
.lazy()
.join(
qualities_projected.lazy(),
[col(subject_col)],
[col(subject_col)],
JoinArgs::new(JoinType::Left),
)
.collect()
.map_err(|e| EtlError::DataProcessing(format!("run_group_by: quality join: {e}")))?;
let joined = apply_missing_policy(joined, group_by)?;
let mut group_keys: Vec<Expr> = group_by.qualities.iter().map(|q| col(q.as_str())).collect();
group_keys.push(col(time_col));
let mut agg_exprs: Vec<Expr> = Vec::with_capacity(plans.len() * 5);
for plan in plans {
let name = plan.measurement.as_str();
agg_exprs.push(aggregation_expr(plan.aggregation, name).alias(name));
agg_exprs.push(col(name).count().alias(n_col(name)));
agg_exprs.push(col(name).std(1).alias(std_col(name)));
agg_exprs.push(col(name).min().alias(min_col(name)));
agg_exprs.push(col(name).max().alias(max_col(name)));
}
let mut sort_cols: Vec<String> = group_by
.qualities
.iter()
.map(|q| q.as_str().to_string())
.collect();
sort_cols.push(time_col.to_string());
let grouped = joined
.lazy()
.group_by(group_keys)
.agg(agg_exprs)
.sort(sort_cols, SortMultipleOptions::default())
.collect()
.map_err(|e| EtlError::DataProcessing(format!("run_group_by: aggregation: {e}")))?;
let stats = extract_group_stats(&grouped, group_by, plans, time_col)?;
let data = build_main_dataframe(grouped, group_by, plans, subject_col, time_col)?;
Ok(GroupAggregateOutput { data, stats })
}
fn validate_inputs(
subset_df: &DataFrame,
qualities_df: &DataFrame,
group_by: &GroupBy,
plans: &[GroupAggregationPlan],
subject_col: &str,
time_col: &str,
) -> EtlResult<()> {
if group_by.qualities.is_empty() {
return Err(EtlError::Config(
"GroupBy.qualities must not be empty".into(),
));
}
subset_df.column(subject_col).map_err(|e| {
EtlError::DataProcessing(format!(
"run_group_by: subject column '{subject_col}' missing in subset: {e}"
))
})?;
subset_df.column(time_col).map_err(|e| {
EtlError::DataProcessing(format!(
"run_group_by: time column '{time_col}' missing in subset: {e}"
))
})?;
qualities_df.column(subject_col).map_err(|e| {
EtlError::DataProcessing(format!(
"run_group_by: subject column '{subject_col}' missing in qualities: {e}"
))
})?;
for q in &group_by.qualities {
qualities_df.column(q.as_str()).map_err(|e| {
EtlError::DataProcessing(format!(
"run_group_by: quality '{}' missing in qualities DataFrame: {e}",
q.as_str(),
))
})?;
}
for plan in plans {
let name = plan.measurement.as_str();
subset_df.column(name).map_err(|e| {
EtlError::DataProcessing(format!(
"run_group_by: measurement '{name}' missing in subset: {e}"
))
})?;
}
Ok(())
}
fn apply_missing_policy(joined: DataFrame, group_by: &GroupBy) -> EtlResult<DataFrame> {
match group_by.missing_policy {
MissingQualityPolicy::Drop => {
let mut lf = joined.lazy();
for q in &group_by.qualities {
lf = lf.filter(col(q.as_str()).is_not_null());
}
lf.collect().map_err(|e| {
EtlError::DataProcessing(format!(
"run_group_by: missing-policy drop filter failed: {e}"
))
})
}
MissingQualityPolicy::SyntheticGroup => {
let mut lf = joined.lazy();
for q in &group_by.qualities {
lf = lf.with_column(
col(q.as_str())
.fill_null(lit(MissingQualityPolicy::SYNTHETIC_LABEL))
.alias(q.as_str()),
);
}
lf.collect().map_err(|e| {
EtlError::DataProcessing(format!(
"run_group_by: missing-policy synthetic fill failed: {e}"
))
})
}
MissingQualityPolicy::Error => {
let quality_cols: Vec<String> = group_by
.qualities
.iter()
.map(|q| q.as_str().to_string())
.collect();
let mut missing_any = col(quality_cols[0].as_str()).is_null();
for q in quality_cols.iter().skip(1) {
missing_any = missing_any.or(col(q.as_str()).is_null());
}
let missing_count = joined
.clone()
.lazy()
.filter(missing_any)
.collect()
.map_err(|e| {
EtlError::DataProcessing(format!(
"run_group_by: missing-policy error check failed: {e}"
))
})?
.height();
if missing_count > 0 {
return Err(EtlError::DataProcessing(format!(
"run_group_by: {missing_count} rows have missing quality values \
(policy = Error). Configure MissingQualityPolicy::SyntheticGroup \
or Drop to handle them."
)));
}
Ok(joined)
}
}
}
fn aggregation_expr(agg: Aggregate, col_name: &str) -> Expr {
match agg {
Aggregate::Mean => col(col_name).mean(),
Aggregate::Sum => col(col_name).sum(),
Aggregate::Min => col(col_name).min(),
Aggregate::Max => col(col_name).max(),
Aggregate::Any => col(col_name).max(),
Aggregate::All => col(col_name).min(),
Aggregate::Count => col(col_name).count().cast(DataType::Float64),
Aggregate::First => col(col_name).first(),
Aggregate::Last => col(col_name).last(),
Aggregate::MostRecent
| Aggregate::LeastRecent
| Aggregate::LinearTrend
| Aggregate::Auto => col(col_name).mean(),
}
}
fn n_col(name: &str) -> String {
format!("__{name}__n")
}
fn std_col(name: &str) -> String {
format!("__{name}__std")
}
fn min_col(name: &str) -> String {
format!("__{name}__min")
}
fn max_col(name: &str) -> String {
format!("__{name}__max")
}
fn extract_group_stats(
grouped: &DataFrame,
group_by: &GroupBy,
plans: &[GroupAggregationPlan],
time_col: &str,
) -> EtlResult<Vec<GroupStats>> {
let rows = grouped.height();
let mut stats = Vec::with_capacity(rows * plans.len());
let mut quality_cols: Vec<(CanonicalColumnName, StringChunked)> = Vec::new();
for q in &group_by.qualities {
let series = grouped
.column(q.as_str())
.map_err(|e| {
EtlError::DataProcessing(format!(
"extract_group_stats: quality '{}' missing: {e}",
q.as_str(),
))
})?
.cast(&DataType::String)
.map_err(|e| {
EtlError::DataProcessing(format!(
"extract_group_stats: cast quality '{}' to String: {e}",
q.as_str(),
))
})?;
let string_series = series
.str()
.map_err(|e| {
EtlError::DataProcessing(format!(
"extract_group_stats: '{}' not String after cast: {e}",
q.as_str(),
))
})?
.clone();
quality_cols.push((q.clone(), string_series));
}
let time_phys = grouped
.column(time_col)
.map_err(|e| {
EtlError::DataProcessing(format!(
"extract_group_stats: time column '{time_col}' missing: {e}"
))
})?
.to_physical_repr()
.i64()
.map_err(|e| {
EtlError::DataProcessing(format!(
"extract_group_stats: time column not i64-backed: {e}"
))
})?
.clone();
let labels: Vec<String> = (0..rows)
.map(|i| {
quality_cols
.iter()
.map(|(_, ca)| {
ca.get(i)
.unwrap_or(MissingQualityPolicy::SYNTHETIC_LABEL)
.to_string()
})
.collect::<Vec<_>>()
.join(" | ")
})
.collect();
struct MeasurementStatsCols {
name: CanonicalColumnName,
aggregation: Aggregate,
value: Float64Chunked,
n: IdxCa,
std: Float64Chunked,
min: Float64Chunked,
max: Float64Chunked,
}
let mut per_m: Vec<MeasurementStatsCols> = Vec::with_capacity(plans.len());
for plan in plans {
let name = plan.measurement.as_str();
per_m.push(MeasurementStatsCols {
name: plan.measurement.clone(),
aggregation: plan.aggregation,
value: cast_f64(grouped, name)?,
n: cast_idx(grouped, &n_col(name))?,
std: cast_f64(grouped, &std_col(name))?,
min: cast_f64(grouped, &min_col(name))?,
max: cast_f64(grouped, &max_col(name))?,
});
}
for i in 0..rows {
let label = &labels[i];
let Some(ts) = time_phys.get(i) else {
continue;
};
let quality_values: HashMap<CanonicalColumnName, Option<String>> = quality_cols
.iter()
.map(|(name, ca)| (name.clone(), ca.get(i).map(|s| s.to_string())))
.collect();
for m in &per_m {
let n = m.n.get(i).unwrap_or(0) as usize;
let value = m.value.get(i);
let std = m.std.get(i);
let min = m.min.get(i);
let max = m.max.get(i);
let stderr = match (std, n) {
(Some(s), n) if n > 0 => Some(s / (n as f64).sqrt()),
_ => None,
};
stats.push(GroupStats {
group_label: label.clone(),
quality_values: quality_values.clone(),
bucket_start_ms: ts,
measurement: m.name.clone(),
aggregation: m.aggregation,
n_subjects_contributing: n,
value,
stderr,
min,
max,
});
}
}
Ok(stats)
}
fn build_main_dataframe(
grouped: DataFrame,
group_by: &GroupBy,
plans: &[GroupAggregationPlan],
subject_col: &str,
time_col: &str,
) -> EtlResult<DataFrame> {
let qualities_string_exprs: Vec<Expr> = group_by
.qualities
.iter()
.map(|q| col(q.as_str()).cast(DataType::String))
.collect();
let label_expr = concat_str(qualities_string_exprs, " | ", false).alias(subject_col);
let mut output_cols: Vec<Expr> = vec![label_expr, col(time_col)];
for plan in plans {
output_cols.push(col(plan.measurement.as_str()));
}
grouped
.lazy()
.select(output_cols)
.sort([subject_col, time_col], SortMultipleOptions::default())
.collect()
.map_err(|e| EtlError::DataProcessing(format!("build_main_dataframe: select/sort: {e}")))
}
fn cast_f64(df: &DataFrame, name: &str) -> EtlResult<Float64Chunked> {
let series = df
.column(name)
.map_err(|e| EtlError::DataProcessing(format!("column '{name}' missing: {e}")))?
.as_materialized_series();
series
.cast(&DataType::Float64)
.map_err(|e| EtlError::DataProcessing(format!("cast '{name}' to f64: {e}")))?
.f64()
.map_err(|e| EtlError::DataProcessing(format!("'{name}' not f64 after cast: {e}")))
.map(|ca| ca.clone())
}
fn cast_idx(df: &DataFrame, name: &str) -> EtlResult<IdxCa> {
let series = df
.column(name)
.map_err(|e| EtlError::DataProcessing(format!("column '{name}' missing: {e}")))?
.as_materialized_series();
series.idx().map(|ca| ca.clone()).or_else(|_| {
series
.cast(&polars::prelude::IDX_DTYPE)
.map_err(|e| EtlError::DataProcessing(format!("cast '{name}' to IDX: {e}")))?
.idx()
.map(|ca| ca.clone())
.map_err(|e| EtlError::DataProcessing(format!("'{name}' not IDX after cast: {e}")))
})
}
#[cfg(test)]
mod tests {
use chrono::{TimeZone as _, Utc};
use super::*;
const SUBJECT: &str = "subject";
const TIME: &str = "time";
fn ts(day: i64) -> i64 {
Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0)
.unwrap()
.timestamp_millis()
+ day * 24 * 3_600_000
}
fn build_subset(
subjects: &[&str],
days: &[i64],
measurement: &str,
values: &[Option<f64>],
) -> DataFrame {
assert_eq!(subjects.len(), days.len());
assert_eq!(subjects.len(), values.len());
let timestamps: Vec<i64> = days.iter().map(|d| ts(*d)).collect();
let time_ca = Int64Chunked::new(TIME.into(), ×tamps)
.into_datetime(TimeUnit::Milliseconds, Some(polars::prelude::TimeZone::UTC));
DataFrame::new(vec![
Column::new(SUBJECT.into(), subjects),
time_ca.into_column(),
Column::new(measurement.into(), values),
])
.unwrap()
}
fn build_qualities(subjects: &[&str], parishes: &[Option<&str>]) -> DataFrame {
assert_eq!(subjects.len(), parishes.len());
DataFrame::new(vec![
Column::new(SUBJECT.into(), subjects),
Column::new("parish".into(), parishes),
])
.unwrap()
}
fn plan(name: &str, agg: Aggregate) -> GroupAggregationPlan {
GroupAggregationPlan {
measurement: CanonicalColumnName::new(name),
aggregation: agg,
aggregation_source: crate::interval::AggregationSource::Schema,
reason: "test fixture".to_string(),
}
}
fn group_by(qualities: &[&str], policy: MissingQualityPolicy) -> GroupBy {
GroupBy {
qualities: qualities
.iter()
.map(|q| CanonicalColumnName::new(*q))
.collect(),
aggregation_override: None,
missing_policy: policy,
}
}
#[test]
fn subjects_sharing_a_quality_aggregate_together() {
let subset = build_subset(
&["A", "B", "C"],
&[0, 0, 0],
"sump",
&[Some(2.0), Some(4.0), Some(10.0)],
);
let qualities = build_qualities(
&["A", "B", "C"],
&[Some("Orleans"), Some("Orleans"), Some("Jefferson")],
);
let gb = group_by(&["parish"], MissingQualityPolicy::SyntheticGroup);
let plans = vec![plan("sump", Aggregate::Mean)];
let out = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME).unwrap();
assert_eq!(out.data.height(), 2);
assert_eq!(out.stats.len(), 2);
let orleans = out
.stats
.iter()
.find(|s| s.group_label == "Orleans")
.expect("Orleans group present");
assert_eq!(orleans.n_subjects_contributing, 2);
assert_eq!(orleans.value, Some(3.0)); assert_eq!(orleans.min, Some(2.0));
assert_eq!(orleans.max, Some(4.0));
let jefferson = out
.stats
.iter()
.find(|s| s.group_label == "Jefferson")
.expect("Jefferson group present");
assert_eq!(jefferson.n_subjects_contributing, 1);
assert_eq!(jefferson.value, Some(10.0));
}
#[test]
fn null_subject_values_do_not_contribute_to_n() {
let subset = build_subset(
&["A", "B", "C"],
&[0, 0, 0],
"sump",
&[Some(2.0), None, Some(4.0)],
);
let qualities = build_qualities(
&["A", "B", "C"],
&[Some("Orleans"), Some("Orleans"), Some("Orleans")],
);
let gb = group_by(&["parish"], MissingQualityPolicy::SyntheticGroup);
let plans = vec![plan("sump", Aggregate::Mean)];
let out = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME).unwrap();
let orleans = &out.stats[0];
assert_eq!(orleans.n_subjects_contributing, 2, "B is null, not counted");
assert_eq!(orleans.value, Some(3.0), "mean of 2 and 4");
}
#[test]
fn missing_quality_synthetic_group_preserves_subjects() {
let subset = build_subset(&["A", "B"], &[0, 0], "sump", &[Some(2.0), Some(4.0)]);
let qualities = build_qualities(&["A", "B"], &[Some("Orleans"), None]);
let gb = group_by(&["parish"], MissingQualityPolicy::SyntheticGroup);
let plans = vec![plan("sump", Aggregate::Mean)];
let out = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME).unwrap();
assert_eq!(out.stats.len(), 2, "Orleans + __unspecified__");
let synthetic = out
.stats
.iter()
.find(|s| s.group_label == MissingQualityPolicy::SYNTHETIC_LABEL)
.expect("synthetic group present");
assert_eq!(synthetic.n_subjects_contributing, 1);
assert_eq!(synthetic.value, Some(4.0));
}
#[test]
fn missing_quality_drop_removes_subjects() {
let subset = build_subset(&["A", "B"], &[0, 0], "sump", &[Some(2.0), Some(4.0)]);
let qualities = build_qualities(&["A", "B"], &[Some("Orleans"), None]);
let gb = group_by(&["parish"], MissingQualityPolicy::Drop);
let plans = vec![plan("sump", Aggregate::Mean)];
let out = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME).unwrap();
assert_eq!(out.stats.len(), 1, "only Orleans survives");
let orleans = &out.stats[0];
assert_eq!(orleans.group_label, "Orleans");
assert_eq!(orleans.n_subjects_contributing, 1);
assert_eq!(orleans.value, Some(2.0));
}
#[test]
fn missing_quality_error_returns_etl_error() {
let subset = build_subset(&["A", "B"], &[0, 0], "sump", &[Some(2.0), Some(4.0)]);
let qualities = build_qualities(&["A", "B"], &[Some("Orleans"), None]);
let gb = group_by(&["parish"], MissingQualityPolicy::Error);
let plans = vec![plan("sump", Aggregate::Mean)];
let err = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME);
assert!(err.is_err());
let msg = format!("{}", err.unwrap_err());
assert!(msg.contains("missing quality values"), "msg = {msg}");
}
#[test]
fn multiple_qualities_build_composite_group_label() {
let subset = build_subset(
&["A", "B", "C"],
&[0, 0, 0],
"sump",
&[Some(1.0), Some(2.0), Some(3.0)],
);
let qualities = DataFrame::new(vec![
Column::new(SUBJECT.into(), &["A", "B", "C"]),
Column::new(
"parish".into(),
&[Some("Orleans"), Some("Orleans"), Some("Jefferson")],
),
Column::new(
"pump_type".into(),
&[Some("Large"), Some("Small"), Some("Large")],
),
])
.unwrap();
let gb = group_by(
&["parish", "pump_type"],
MissingQualityPolicy::SyntheticGroup,
);
let plans = vec![plan("sump", Aggregate::Mean)];
let out = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME).unwrap();
assert_eq!(out.stats.len(), 3);
let labels: std::collections::HashSet<&str> =
out.stats.iter().map(|s| s.group_label.as_str()).collect();
assert!(labels.contains("Orleans | Large"));
assert!(labels.contains("Orleans | Small"));
assert!(labels.contains("Jefferson | Large"));
for s in &out.stats {
assert_eq!(s.quality_values.len(), 2);
assert!(
s.quality_values
.contains_key(&CanonicalColumnName::new("parish"))
);
assert!(
s.quality_values
.contains_key(&CanonicalColumnName::new("pump_type"))
);
}
}
#[test]
fn different_measurements_respect_per_plan_aggregation() {
let subset = DataFrame::new(vec![
Column::new(SUBJECT.into(), &["A", "B"]),
Int64Chunked::new(TIME.into(), &[ts(0), ts(0)])
.into_datetime(TimeUnit::Milliseconds, Some(polars::prelude::TimeZone::UTC))
.into_column(),
Column::new("sump".into(), &[Some(2.0), Some(4.0)]),
Column::new("engines_on_count".into(), &[Some(1.0), Some(1.0)]),
])
.unwrap();
let qualities = build_qualities(&["A", "B"], &[Some("Orleans"), Some("Orleans")]);
let gb = group_by(&["parish"], MissingQualityPolicy::SyntheticGroup);
let plans = vec![
plan("sump", Aggregate::Mean),
plan("engines_on_count", Aggregate::Sum),
];
let out = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME).unwrap();
let sump = out
.stats
.iter()
.find(|s| s.measurement.as_str() == "sump")
.unwrap();
assert_eq!(sump.value, Some(3.0));
let engines = out
.stats
.iter()
.find(|s| s.measurement.as_str() == "engines_on_count")
.unwrap();
assert_eq!(engines.value, Some(2.0));
}
#[test]
fn stderr_of_group_follows_sample_std_over_sqrt_n() {
let subset = build_subset(
&["A", "B", "C", "D"],
&[0, 0, 0, 0],
"x",
&[Some(1.0), Some(2.0), Some(3.0), Some(4.0)],
);
let qualities = build_qualities(&["A", "B", "C", "D"], &[Some("One"); 4]);
let gb = group_by(&["parish"], MissingQualityPolicy::SyntheticGroup);
let plans = vec![plan("x", Aggregate::Mean)];
let out = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME).unwrap();
let row = &out.stats[0];
assert_eq!(row.n_subjects_contributing, 4);
assert_eq!(row.value, Some(2.5));
let expected_std = (5.0_f64 / 3.0).sqrt();
let expected_stderr = expected_std / 4.0_f64.sqrt();
let actual = row.stderr.expect("stderr present with N > 1");
assert!(
(actual - expected_stderr).abs() < 1e-6,
"stderr: expected {expected_stderr}, got {actual}",
);
}
#[test]
fn main_dataframe_replaces_subject_with_group_label() {
let subset = build_subset(&["A", "B"], &[0, 0], "sump", &[Some(2.0), Some(4.0)]);
let qualities = build_qualities(&["A", "B"], &[Some("Orleans"), Some("Orleans")]);
let gb = group_by(&["parish"], MissingQualityPolicy::SyntheticGroup);
let plans = vec![plan("sump", Aggregate::Mean)];
let out = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME).unwrap();
let names: Vec<&str> = out.data.get_column_names_str().into_iter().collect();
assert!(names.contains(&SUBJECT));
assert!(names.contains(&TIME));
assert!(names.contains(&"sump"));
assert!(
!names.contains(&"parish"),
"quality columns dropped from main DF"
);
assert!(
!names.iter().any(|n| n.starts_with("__")),
"no stat sidecars leak"
);
let subj_col = out.data.column(SUBJECT).unwrap().str().unwrap();
assert_eq!(subj_col.get(0), Some("Orleans"));
}
#[test]
fn apply_group_by_with_none_is_passthrough() {
let subset = build_subset(&["A"], &[0], "sump", &[Some(1.0)]);
let qualities = build_qualities(&["A"], &[Some("X")]);
let outcome = apply_group_by(
subset.clone(),
None,
Some(&qualities),
Vec::new(),
SUBJECT,
TIME,
)
.unwrap();
assert_eq!(outcome.data.height(), subset.height());
assert!(outcome.stats.is_empty());
assert!(outcome.diags.is_empty());
}
#[test]
fn apply_group_by_with_empty_plans_is_passthrough() {
let subset = build_subset(&["A"], &[0], "sump", &[Some(1.0)]);
let qualities = build_qualities(&["A"], &[Some("X")]);
let gb = group_by(&["parish"], MissingQualityPolicy::SyntheticGroup);
let outcome = apply_group_by(
subset.clone(),
Some(&gb),
Some(&qualities),
Vec::new(),
SUBJECT,
TIME,
)
.unwrap();
assert_eq!(outcome.data.height(), subset.height());
assert!(outcome.stats.is_empty());
assert!(outcome.diags.is_empty());
}
#[test]
fn empty_qualities_list_errors() {
let subset = build_subset(&["A"], &[0], "sump", &[Some(1.0)]);
let qualities = build_qualities(&["A"], &[Some("X")]);
let gb = GroupBy {
qualities: Vec::new(),
aggregation_override: None,
missing_policy: MissingQualityPolicy::SyntheticGroup,
};
let plans = vec![plan("sump", Aggregate::Mean)];
let err = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME);
assert!(err.is_err());
}
#[test]
fn missing_quality_in_qualities_df_errors() {
let subset = build_subset(&["A"], &[0], "sump", &[Some(1.0)]);
let qualities = build_qualities(&["A"], &[Some("X")]);
let gb = group_by(&["region"], MissingQualityPolicy::SyntheticGroup); let plans = vec![plan("sump", Aggregate::Mean)];
let err = run_group_by(&subset, &qualities, &gb, &plans, SUBJECT, TIME);
assert!(err.is_err());
}
#[test]
fn whole_window_interval_then_group_by_collapses_to_one_row_per_group() {
use crate::interval::planner::{AggregationSource, ResamplingPath, ResamplingPlan};
use crate::interval::run_interval as run_interval_fn;
use crate::interval::{IntervalBucket, RateStrategy, ReportInterval};
let subset = build_subset(
&["A", "A", "A", "B", "B", "C", "C", "C"],
&[0, 1, 2, 0, 1, 0, 1, 2],
"sump",
&[
Some(1.0),
Some(2.0),
Some(3.0), Some(10.0),
Some(20.0), Some(100.0),
Some(200.0),
Some(300.0), ],
);
let qualities = build_qualities(
&["A", "B", "C"],
&[Some("Orleans"), Some("Orleans"), Some("Jefferson")],
);
let interval_plans = vec![ResamplingPlan {
measurement: CanonicalColumnName::new("sump"),
path: ResamplingPath::Aggregate,
target_rate_ms: i64::MAX,
native_rate_ms: Some(86_400_000),
aggregation: Aggregate::Mean,
aggregation_source: AggregationSource::Schema,
reason: "fixture".into(),
}];
let _ = ReportInterval {
bucket: IntervalBucket::WholeWindow,
strategy: RateStrategy::Auto,
aggregation_override: None,
empty_bucket: crate::interval::EmptyBucketPolicy::Null,
};
let interval_out = run_interval_fn(
&subset,
&interval_plans,
&IntervalBucket::WholeWindow,
SUBJECT,
TIME,
)
.unwrap();
assert_eq!(
interval_out.data.height(),
3,
"WholeWindow yields one row per subject",
);
let gb = group_by(&["parish"], MissingQualityPolicy::SyntheticGroup);
let group_plans = vec![plan("sump", Aggregate::Mean)];
let group_out = run_group_by(
&interval_out.data,
&qualities,
&gb,
&group_plans,
SUBJECT,
TIME,
)
.unwrap();
assert_eq!(group_out.data.height(), 2, "2 parishes → 2 rows",);
assert_eq!(group_out.stats.len(), 2);
let orleans = group_out
.stats
.iter()
.find(|s| s.group_label == "Orleans")
.unwrap();
assert_eq!(orleans.n_subjects_contributing, 2);
assert_eq!(orleans.value, Some(8.5));
let jefferson = group_out
.stats
.iter()
.find(|s| s.group_label == "Jefferson")
.unwrap();
assert_eq!(jefferson.n_subjects_contributing, 1);
assert_eq!(jefferson.value, Some(200.0));
}
}