use chrono::{DateTime, TimeZone as _, Utc};
use etl_unit::{
CanonicalColumnName, EtlSchema, EtlUnitSubsetRequest, MeasurementKind,
request::TimeRange,
signal_policy::SignalPolicy,
source::{BoundSource, EtlUniverseBuildPlan},
unit::NullValue,
universe::{Universe, UniverseBuilder, alignment::AlignmentSpec},
};
use polars::prelude::*;
use std::time::Duration;
const SUBJECT_COL: &str = "station_id";
const TIME_COL: &str = "observation_time";
const SUBJECT: &str = "S";
fn base_time() -> DateTime<Utc> {
Utc.with_ymd_and_hms(2026, 1, 1, 9, 0, 0).unwrap()
}
fn one_station_source(value_col: &str, minute_offsets: &[i64], values: &[f64]) -> DataFrame {
assert_eq!(minute_offsets.len(), values.len());
let times: Vec<i64> = minute_offsets
.iter()
.map(|m| base_time().timestamp_millis() + m * 60_000)
.collect();
let subjects: Vec<&str> = vec![SUBJECT; minute_offsets.len()];
let time_ca = Int64Chunked::new(TIME_COL.into(), ×)
.into_datetime(TimeUnit::Milliseconds, Some(polars::prelude::TimeZone::UTC));
DataFrame::new(vec![
Column::new(SUBJECT_COL.into(), &subjects),
time_ca.into_column(),
Column::new(value_col.into(), values),
])
.unwrap()
}
fn build(schema: &EtlSchema, df: DataFrame) -> Universe {
let plan = EtlUniverseBuildPlan::new(schema.clone())
.source(BoundSource::identity("default", df, schema));
let mut universe = UniverseBuilder::build(&plan).expect("universe should build");
let names: Vec<CanonicalColumnName> =
schema.measurements.iter().map(|m| m.name.clone()).collect();
let spec = AlignmentSpec::compute(&universe.measurements, &names);
universe
.ensure_aligned(spec)
.expect("alignment should compute");
universe
}
fn full_window_request(measurement: &str) -> EtlUnitSubsetRequest {
let start = base_time() - chrono::Duration::hours(1);
let end = base_time() + chrono::Duration::hours(2);
EtlUnitSubsetRequest::new()
.measurements(vec![measurement.into()])
.time_range(TimeRange::new(Some(start), Some(end)))
}
#[test]
fn raw_returns_one_row_per_observation() {
let df = one_station_source("sump", &[0, 1, 3, 4], &[1.0, 1.5, 2.0, 2.5]);
let schema = EtlSchema::new("raw_test")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_secs(60)))
.with_sample_rate(60_000)
.build()
.unwrap();
let universe = build(&schema, df);
let raw = universe.subset_raw(&full_window_request("sump")).unwrap();
assert_eq!(
raw.dataframe().height(),
4,
"raw subset must have exactly one row per observation",
);
let sump_col = raw.dataframe().column("sump").unwrap();
assert_eq!(
sump_col.null_count(),
0,
"raw observations were all non-null"
);
}
#[test]
fn processed_dense_grid_no_null_value_keeps_gaps_as_null() {
let df = one_station_source("sump", &[0, 1, 3, 4], &[1.0, 1.5, 2.0, 2.5]);
let schema = EtlSchema::new("proc_no_nv")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_secs(60)))
.with_sample_rate(60_000)
.build()
.unwrap();
let universe = build(&schema, df);
let processed = universe.subset(&full_window_request("sump")).unwrap();
assert_eq!(
processed.dataframe().height(),
5,
"dense grid must cover every minute from min to max observation",
);
let sump_col = processed.dataframe().column("sump").unwrap();
assert_eq!(
sump_col.null_count(),
1,
"the gap at minute 2 must be null when no null_value is configured",
);
}
#[test]
fn processed_dense_grid_with_null_value_fills_gaps() {
let df = one_station_source("sump", &[0, 1, 3, 4], &[1.0, 1.5, 2.0, 2.5]);
let schema = EtlSchema::new("proc_with_nv")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_secs(60)))
.with_sample_rate(60_000)
.with_null_value(NullValue::Float(0.0))
.build()
.unwrap();
let universe = build(&schema, df);
let processed = universe.subset(&full_window_request("sump")).unwrap();
assert_eq!(
processed.dataframe().height(),
5,
"dense grid still 5 cells"
);
assert_eq!(
processed.dataframe().column("sump").unwrap().null_count(),
0,
"null_value = 0 must fill the gap at minute 2",
);
let sump = processed
.dataframe()
.sort([TIME_COL], SortMultipleOptions::default())
.unwrap();
let values: Vec<Option<f64>> = sump
.column("sump")
.unwrap()
.f64()
.unwrap()
.into_iter()
.collect();
assert_eq!(
values,
vec![Some(1.0), Some(1.5), Some(0.0), Some(2.0), Some(2.5)],
"gap at minute 2 filled with null_value, not forward-filled",
);
}
#[test]
fn ttl_equal_sample_rate_does_not_carry_forward() {
let df = one_station_source("sump", &[0, 3], &[1.0, 4.0]);
let schema = EtlSchema::new("ttl_eq_sr")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_secs(60)))
.with_sample_rate(60_000)
.build()
.unwrap();
let universe = build(&schema, df);
let processed = universe.subset(&full_window_request("sump")).unwrap();
assert_eq!(processed.dataframe().height(), 4);
let values: Vec<Option<f64>> = processed
.dataframe()
.sort([TIME_COL], SortMultipleOptions::default())
.unwrap()
.column("sump")
.unwrap()
.f64()
.unwrap()
.into_iter()
.collect();
assert_eq!(
values,
vec![Some(1.0), None, None, Some(4.0)],
"TTL == sample_rate: no forward-fill; gaps stay null",
);
}
#[test]
fn ttl_one_cell_carry_forward() {
let df = one_station_source("sump", &[0, 3], &[1.0, 4.0]);
let schema = EtlSchema::new("ttl_90")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_millis(90_000)))
.with_sample_rate(60_000)
.build()
.unwrap();
let universe = build(&schema, df);
let processed = universe.subset(&full_window_request("sump")).unwrap();
let values: Vec<Option<f64>> = processed
.dataframe()
.sort([TIME_COL], SortMultipleOptions::default())
.unwrap()
.column("sump")
.unwrap()
.f64()
.unwrap()
.into_iter()
.collect();
assert_eq!(
values,
vec![Some(1.0), Some(1.0), None, Some(4.0)],
"observation at minute 0 carries forward 1 cell (to minute 1); minute 2 expires",
);
}
#[test]
fn ttl_two_cell_carry_forward() {
let df = one_station_source("sump", &[0, 3], &[1.0, 4.0]);
let schema = EtlSchema::new("ttl_121")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_millis(121_000)))
.with_sample_rate(60_000)
.build()
.unwrap();
let universe = build(&schema, df);
let processed = universe.subset(&full_window_request("sump")).unwrap();
let values: Vec<Option<f64>> = processed
.dataframe()
.sort([TIME_COL], SortMultipleOptions::default())
.unwrap()
.column("sump")
.unwrap()
.f64()
.unwrap()
.into_iter()
.collect();
assert_eq!(
values,
vec![Some(1.0), Some(1.0), Some(1.0), Some(4.0)],
"observation at minute 0 carries forward 2 cells (1 and 2); minute 3 has its own observation",
);
}
#[test]
fn ttl_carries_first_then_null_value_fills_remainder() {
let df = one_station_source("sump", &[0, 5], &[1.0, 6.0]);
let schema = EtlSchema::new("ttl_then_nv")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_millis(90_000)))
.with_sample_rate(60_000)
.with_null_value(NullValue::Float(-1.0))
.build()
.unwrap();
let universe = build(&schema, df);
let processed = universe.subset(&full_window_request("sump")).unwrap();
let values: Vec<Option<f64>> = processed
.dataframe()
.sort([TIME_COL], SortMultipleOptions::default())
.unwrap()
.column("sump")
.unwrap()
.f64()
.unwrap()
.into_iter()
.collect();
assert_eq!(
values,
vec![
Some(1.0), Some(1.0), Some(-1.0), Some(-1.0), Some(-1.0), Some(6.0), ],
"TTL fills first within the staleness window, null_value fills the rest",
);
assert_eq!(
processed.dataframe().column("sump").unwrap().null_count(),
0,
"with null_value configured, no nulls survive into the processed output",
);
}
#[test]
fn processed_cardinality_independent_of_null_value() {
let observations = &[0, 1, 3, 4, 6, 7, 9]; let values = &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0];
let schema_a = EtlSchema::new("nv_off")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_secs(60)))
.with_sample_rate(60_000)
.build()
.unwrap();
let schema_b = EtlSchema::new("nv_on")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_secs(60)))
.with_sample_rate(60_000)
.with_null_value(NullValue::Float(0.0))
.build()
.unwrap();
let df_a = one_station_source("sump", observations, values);
let df_b = one_station_source("sump", observations, values);
let req = full_window_request("sump");
let proc_a = build(&schema_a, df_a).subset(&req).unwrap();
let proc_b = build(&schema_b, df_b).subset(&req).unwrap();
assert_eq!(proc_a.dataframe().height(), 10);
assert_eq!(proc_b.dataframe().height(), 10);
assert_eq!(
proc_a.dataframe().height(),
proc_b.dataframe().height(),
"null_value affects nulls, not row count",
);
let nulls_a = proc_a.dataframe().column("sump").unwrap().null_count();
let nulls_b = proc_b.dataframe().column("sump").unwrap().null_count();
assert_eq!(nulls_a, 3, "without null_value: 3 gap cells stay null");
assert_eq!(nulls_b, 0, "with null_value: gaps filled, no nulls survive");
}
#[test]
fn raw_processed_row_count_difference_equals_unobserved_cells() {
let df = one_station_source("sump", &[0, 1, 3, 4], &[1.0, 1.5, 2.0, 2.5]);
let schema = EtlSchema::new("delta")
.subject(SUBJECT_COL)
.time(TIME_COL)
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy::instant().with_ttl(Duration::from_secs(60)))
.with_sample_rate(60_000)
.build()
.unwrap();
let req = full_window_request("sump");
let universe = build(&schema, df);
let raw = universe.subset_raw(&req).unwrap();
let processed = universe.subset(&req).unwrap();
let raw_rows = raw.dataframe().height();
let proc_rows = processed.dataframe().height();
let proc_nulls = processed.dataframe().column("sump").unwrap().null_count();
assert_eq!(raw_rows, 4);
assert_eq!(proc_rows, 5);
assert_eq!(
proc_rows - raw_rows,
proc_nulls,
"the rows processed adds beyond raw equal the null-filled gap cells",
);
}