use std::{path::PathBuf, sync::Once};
use etl_unit::{
Derivation, EtlSchema, EtlUnitSubsetRequest, MeasurementKind, PointwiseExpr, UnpivotConfig,
source::{BoundSource, EtlUniverseBuildPlan},
universe::UniverseBuilder,
};
use polars::prelude::*;
#[allow(dead_code)]
static INIT: Once = Once::new();
#[allow(dead_code)]
fn init_tracing() {
INIT.call_once(|| {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("etl_unit=debug".parse().unwrap()),
)
.with_test_writer()
.init();
});
}
fn fixture_path(filename: &str) -> PathBuf {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests");
path.push("fixtures");
if !path.exists() {
std::fs::create_dir_all(&path).expect("Failed to create fixtures directory");
}
path.push(filename);
path
}
static SCHEMA_FIXTURE: std::sync::OnceLock<PathBuf> = std::sync::OnceLock::new();
static REQUEST_FIXTURE: std::sync::OnceLock<PathBuf> = std::sync::OnceLock::new();
static PUMP_CSV_FIXTURE: std::sync::OnceLock<PathBuf> = std::sync::OnceLock::new();
fn ensure_schema_fixture() -> PathBuf {
SCHEMA_FIXTURE
.get_or_init(|| {
let path = fixture_path("pump_telemetry_schema.toml");
let toml_content = r#"
name = "pump_telemetry"
subject = "station_id"
time = "observation_time"
[measurements.sump]
kind = "measure"
sample_rate = "60s"
signal_policy = { max_staleness = "60s", windowing = { type = "instant" } }
[measurements.fuel]
kind = "measure"
sample_rate = "60s"
signal_policy = { max_staleness = "60s", windowing = { type = "instant" } }
[measurements.engine_1]
kind = "categorical"
sample_rate = "60s"
signal_policy = { max_staleness = "60s", windowing = { type = "instant" } }
[measurements.engine_2]
kind = "categorical"
sample_rate = "60s"
signal_policy = { max_staleness = "60s", windowing = { type = "instant" } }
[derivations.any_engine_running]
kind = "categorical"
computation.pointwise = { type = "any_on", inputs = ["engine_1", "engine_2"] }
[derivations.engines_running_count]
kind = "count"
computation.pointwise = { type = "count_non_zero", inputs = ["engine_1", "engine_2"] }
"#;
std::fs::write(&path, toml_content).expect("Failed to write schema fixture");
path
})
.clone()
}
fn ensure_subset_request_fixture() -> PathBuf {
REQUEST_FIXTURE
.get_or_init(|| {
let path = fixture_path("subset_request.json");
let json_content = r#"{
"NOTE": "Auto-generated",
"measurements": ["sump", "fuel", "any_engine_running"],
"qualities": [],
"subject_filter": {
"type": "Include",
"values": ["Station_A"]
}
}"#;
std::fs::write(&path, json_content).expect("Failed to write subset request fixture");
path
})
.clone()
}
fn ensure_pump_data_csv() -> PathBuf {
PUMP_CSV_FIXTURE
.get_or_init(|| materialize_pump_data_csv())
.clone()
}
fn materialize_pump_data_csv() -> PathBuf {
let path = fixture_path("pump_data_sample.csv");
if !path.exists() {
use std::io::Write;
let mut file = std::fs::File::create(&path).expect("Failed to create CSV file");
writeln!(
file,
"station_id,observation_time,sump,fuel,engine_1,engine_2"
)
.unwrap();
for i in 0..20 {
let e1 = if i % 2 == 0 { 1 } else { 0 };
let e2 = if i >= 10 { 1 } else { 0 };
writeln!(
file,
"Station_A,2023-10-27 10:00:{:02}.000,10.{},80.0,{},{}",
i, i, e1, e2
)
.unwrap();
}
}
path
}
fn load_pump_data() -> DataFrame {
let path = ensure_pump_data_csv();
CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(path))
.expect("Failed to create CSV reader")
.finish()
.expect("Failed to read CSV")
.lazy()
.with_column(col("observation_time").str().to_datetime(
Some(TimeUnit::Milliseconds),
Some(polars::prelude::TimeZone::UTC),
StrptimeOptions {
format: Some("%Y-%m-%d %H:%M:%S%.f".into()),
..Default::default()
},
lit("raise"),
))
.collect()
.expect("Failed to parse datetime")
}
fn build_universe(schema: &EtlSchema, df: DataFrame) -> etl_unit::universe::Universe {
let plan = EtlUniverseBuildPlan::new(schema.clone())
.source(BoundSource::identity("default", df, schema));
UniverseBuilder::build(&plan).expect("Failed to build universe")
}
#[test]
fn test_load_schema_from_json() {
let path = ensure_schema_fixture();
let schema = EtlSchema::from_toml_file(&path).expect("Failed to load schema");
assert_eq!(schema.name, "pump_telemetry");
assert_eq!(schema.subject.as_str(), "station_id");
assert_eq!(schema.time.as_str(), "observation_time");
assert!(schema.has_measurement("sump"));
assert!(schema.has_measurement("fuel"));
assert!(schema.has_measurement("engine_1"));
assert!(schema.has_measurement("engine_2"));
assert!(schema.get_derivation("any_engine_running").is_some());
assert!(schema.get_derivation("engines_running_count").is_some());
}
#[test]
fn test_schema_signal_policies() {
use std::time::Duration;
use etl_unit::signal_policy::{SignalPolicy, WindowStrategy};
let schema = EtlSchema::new("policy_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.with_policy(SignalPolicy {
max_staleness: Duration::from_secs(60),
windowing: WindowStrategy::Instant,
time_format: Some("%Y-%m-%d %H:%M:%S".into()),
})
.with_sample_rate(60_000)
.measurement_with_defaults("fuel", MeasurementKind::Measure)
.with_policy(SignalPolicy {
max_staleness: Duration::from_secs(60),
windowing: WindowStrategy::Sliding {
duration: Duration::from_secs(30),
min_samples: 3,
},
time_format: Some("%Y-%m-%d %H:%M:%S".into()),
})
.with_sample_rate(60_000)
.build()
.unwrap();
let sump = schema.get_measurement("sump").expect("sump not found");
let policy = sump
.signal_policy
.as_ref()
.expect("sump should have signal policy");
assert_eq!(policy.max_staleness.as_secs(), 60);
assert!(matches!(policy.windowing, WindowStrategy::Instant));
let fuel = schema.get_measurement("fuel").expect("fuel not found");
let fuel_policy = fuel
.signal_policy
.as_ref()
.expect("fuel should have signal policy");
assert_eq!(fuel_policy.max_staleness.as_secs(), 60);
if let WindowStrategy::Sliding {
duration,
min_samples,
} = &fuel_policy.windowing
{
assert_eq!(duration.as_secs(), 30);
assert_eq!(*min_samples, 3);
} else {
panic!("Expected sliding window for fuel");
}
}
#[test]
fn test_load_subset_request_from_json() {
let path = ensure_subset_request_fixture();
let content = std::fs::read_to_string(&path).expect("Failed to read file");
let request: EtlUnitSubsetRequest =
serde_json::from_str(&content).expect("Failed to parse JSON");
assert_eq!(
request.measurements,
vec!["sump".into(), "fuel".into(), "any_engine_running".into()]
);
assert!(request.subject_filter.is_some());
}
#[test]
fn test_execute_subset_with_derivations() {
let schema_path = ensure_schema_fixture();
let schema = EtlSchema::from_toml_file(&schema_path).expect("Failed to load schema");
let df = load_pump_data();
let universe = build_universe(&schema, df);
let request = EtlUnitSubsetRequest::new().measurements(vec![
"sump".into(),
"engine_1".into(),
"engine_2".into(),
"any_engine_running".into(),
]);
let subset = universe.subset(&request).expect("Subset failed");
let df = &subset.data;
assert!(df.column("sump").is_ok());
assert!(df.column("engine_1").is_ok());
assert!(df.column("engine_2").is_ok());
assert!(df.column("any_engine_running").is_ok());
let any_engine = df.column("any_engine_running").unwrap();
let engine_1 = df.column("engine_1").unwrap();
let engine_2 = df.column("engine_2").unwrap();
for i in 0..df.height().min(20) {
let e1: Option<f64> = engine_1.get(i).ok().and_then(|v| v.try_extract().ok());
let e2: Option<f64> = engine_2.get(i).ok().and_then(|v| v.try_extract().ok());
let any: Option<i32> = any_engine.get(i).ok().and_then(|v| v.try_extract().ok());
if let (Some(e1_val), Some(e2_val), Some(any_val)) = (e1, e2, any) {
let expected = if e1_val > 0.0 || e2_val > 0.0 { 1 } else { 0 };
assert_eq!(
any_val, expected,
"any_engine_running mismatch at row {}",
i
);
}
}
}
#[test]
#[ignore = "TODO(0.2): subset projection drops measurement columns when using \
BoundSource::identity on a multi-measurement schema — only subject + \
time + sometimes the first measurement survive. Production data-pipeline \
doesn't hit this path; unblock by either repro-ing with explicit BoundSource \
mappings or auditing the universe→subset projection."]
fn test_programmatic_schema_with_derivations() {
let df = load_pump_data();
let schema = EtlSchema::new("test_schema")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.measurement_with_defaults("fuel", MeasurementKind::Measure)
.measurement_with_defaults("engine_1", MeasurementKind::Categorical)
.measurement_with_defaults("engine_2", MeasurementKind::Categorical)
.with_derivation(Derivation::pointwise(
"any_engine",
PointwiseExpr::any_on(vec!["engine_1", "engine_2"]),
))
.with_derivation(Derivation::pointwise(
"engine_count",
PointwiseExpr::count_non_zero(vec!["engine_1", "engine_2"]),
))
.build()
.unwrap();
assert!(schema.get_derivation("any_engine").is_some());
assert!(schema.get_derivation("engine_count").is_some());
let universe = build_universe(&schema, df);
let request = EtlUnitSubsetRequest::new().measurements(vec![
"sump".into(),
"any_engine".into(),
"engine_count".into(),
]);
let subset = universe.subset(&request).expect("Subset failed");
let df = &subset.data;
assert!(df.column("sump").is_ok());
assert!(df.column("any_engine").is_ok());
assert!(df.column("engine_count").is_ok());
}
#[test]
#[ignore = "TODO(0.2): same column-drop pattern as test_programmatic_schema_with_derivations."]
fn test_subset_universe_metadata() {
let schema_path = ensure_schema_fixture();
let schema = EtlSchema::from_toml_file(&schema_path).expect("Failed to load schema");
let df = load_pump_data();
let universe = build_universe(&schema, df);
let request =
EtlUnitSubsetRequest::new().measurements(vec!["sump".into(), "any_engine_running".into()]);
let subset = universe.subset(&request).expect("Subset failed");
let sump_meta = subset
.get_measurement("sump")
.expect("sump metadata missing");
assert_eq!(sump_meta.kind, MeasurementKind::Measure);
let any_engine_meta = subset
.get_measurement("any_engine_running")
.expect("any_engine_running metadata missing");
assert_eq!(any_engine_meta.kind, MeasurementKind::Categorical);
}
#[test]
fn test_schema_serialization_roundtrip() {
let schema = EtlSchema::new("roundtrip_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.measurement_with_defaults("engine_1", MeasurementKind::Categorical)
.with_derivation(Derivation::pointwise(
"any_engine",
PointwiseExpr::any_on(vec!["engine_1"]),
))
.build()
.unwrap();
let json = serde_json::to_string_pretty(&schema).expect("Failed to serialize");
let deserialized: EtlSchema = serde_json::from_str(&json).expect("Failed to deserialize");
assert_eq!(deserialized.name, schema.name);
assert_eq!(deserialized.measurements.len(), schema.measurements.len());
assert_eq!(deserialized.derivations.len(), schema.derivations.len());
assert!(deserialized.get_derivation("any_engine").is_some());
}
#[test]
#[ignore = "TODO(0.2): same column-drop pattern as test_programmatic_schema_with_derivations."]
fn test_count_non_zero_derivation() {
let df = load_pump_data();
let schema = EtlSchema::new("count_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("engine_1", MeasurementKind::Categorical)
.measurement_with_defaults("engine_2", MeasurementKind::Categorical)
.with_derivation(
Derivation::pointwise(
"engines_running",
PointwiseExpr::count_non_zero(vec!["engine_1", "engine_2"]),
)
.with_kind(MeasurementKind::Count),
)
.build()
.unwrap();
let universe = build_universe(&schema, df);
let request = EtlUnitSubsetRequest::new().measurements(vec!["engines_running".into()]);
let subset = universe.subset(&request).expect("Subset failed");
let engines_running = subset.data.column("engines_running").unwrap();
for i in 0..subset.data.height().min(20) {
let val: Option<i32> = engines_running
.get(i)
.ok()
.and_then(|v| v.try_extract().ok());
if let Some(count) = val {
assert!((0..=2).contains(&count), "Invalid engine count: {}", count);
}
}
}
#[test]
fn test_request_all_measurements() {
let schema_path = ensure_schema_fixture();
let schema = EtlSchema::from_toml_file(&schema_path).expect("Failed to load schema");
let df = load_pump_data();
let universe = build_universe(&schema, df);
let request = EtlUnitSubsetRequest::new();
let subset = universe.subset(&request).expect("Subset failed");
assert!(subset.data.column("sump").is_ok());
assert!(subset.data.column("fuel").is_ok());
assert!(subset.data.column("engine_1").is_ok());
assert!(subset.data.column("engine_2").is_ok());
assert!(subset.data.column("any_engine_running").is_ok());
assert!(subset.data.column("engines_running_count").is_ok());
}
#[test]
fn test_execute_with_unpivot() {
let df = load_pump_data();
let schema = EtlSchema::new("unpivot_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("engine_status", MeasurementKind::Categorical)
.with_component("engine_id") .build()
.unwrap();
let unpivot = UnpivotConfig::creates("engine_status", MeasurementKind::Categorical)
.subject("station_id", "station_id")
.time("observation_time", "observation_time")
.component("engine_id") .from_source("engine_1", [("engine_id", "1")])
.from_source("engine_2", [("engine_id", "2")])
.build();
let plan = EtlUniverseBuildPlan::new(schema.clone())
.source(BoundSource::identity("default", df, &schema).unpivot(unpivot));
let universe = UniverseBuilder::build(&plan).expect("Failed to build universe");
let request = EtlUnitSubsetRequest::new().measurements(vec!["engine_status".into()]);
let subset = universe.subset(&request).expect("Subset failed");
assert!(subset.data.column("engine_status").is_ok());
assert!(
subset.data.column("engine_id").is_err(),
"engine_id should be crushed out of the subset output"
);
assert_eq!(
subset.data.height(),
1,
"Expected 1 row after 60s-bucket resample + crush, got {}",
subset.data.height()
);
}
#[test]
#[ignore = "TODO(0.2): same column-drop pattern as test_programmatic_schema_with_derivations."]
fn test_subset_projects_to_requested_columns_only() {
let schema_path = ensure_schema_fixture();
let schema = EtlSchema::from_toml_file(&schema_path).expect("Failed to load schema");
let df = load_pump_data();
let universe = build_universe(&schema, df);
let request =
EtlUnitSubsetRequest::new().measurements(vec!["sump".into(), "any_engine_running".into()]);
let subset = universe.subset(&request).expect("Subset failed");
assert!(subset.data.column("sump").is_ok(), "sump should be present");
assert!(
subset.data.column("any_engine_running").is_ok(),
"any_engine_running should be present"
);
assert!(
subset.data.column("engine_1").is_err(),
"engine_1 was a dependency, not requested — should be projected out"
);
assert!(
subset.data.column("engine_2").is_err(),
"engine_2 was a dependency, not requested — should be projected out"
);
assert!(
subset.data.column("fuel").is_err(),
"fuel was not requested — should be absent"
);
assert!(subset.data.column("station_id").is_ok());
assert!(subset.data.column("observation_time").is_ok());
let names: Vec<&str> = subset
.measurements
.iter()
.map(|m| m.column.as_str())
.collect();
assert_eq!(
names.len(),
2,
"metadata should list only requested measurements: {:?}",
names
);
assert!(names.contains(&"sump"));
assert!(names.contains(&"any_engine_running"));
}
#[test]
fn test_wide_join_handles_four_member_group() {
use etl_unit::subset::stages::SubsetStage;
let df = load_pump_data();
let schema = EtlSchema::new("scada_four_member")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.measurement_with_defaults("fuel", MeasurementKind::Measure)
.measurement_with_defaults("engine_1", MeasurementKind::Categorical)
.measurement_with_defaults("engine_2", MeasurementKind::Categorical)
.build()
.unwrap();
let universe = build_universe(&schema, df);
let request = EtlUnitSubsetRequest::new().measurements(vec![
"sump".into(),
"fuel".into(),
"engine_1".into(),
"engine_2".into(),
]);
let subset = universe.subset(&request).expect("Subset failed");
for col in ["sump", "fuel", "engine_1", "engine_2"] {
assert!(subset.data.column(col).is_ok(), "missing column {}", col);
}
let mut wide_members: Vec<String> = Vec::new();
let mut narrow: Vec<String> = Vec::new();
for diag in &subset.info.stage_trace {
match &diag.stage {
SubsetStage::WideJoin { measurements, .. } => wide_members.extend(measurements.clone()),
SubsetStage::JoinMeasurement { measurement } => narrow.push(measurement.clone()),
_ => {}
}
}
assert_eq!(
wide_members.len(),
4,
"expected 4 wide members, got {:?}",
wide_members
);
for name in ["sump", "fuel", "engine_1", "engine_2"] {
assert!(
wide_members.iter().any(|m| m == name),
"wide_join missing {} (members: {:?})",
name,
wide_members
);
}
assert!(
narrow.is_empty(),
"all four members should be wide-joined, found narrow joins for: {:?}",
narrow
);
}
#[test]
fn test_wide_join_handles_upsample_case() {
use etl_unit::Interval;
use etl_unit::subset::stages::SubsetStage;
use std::time::Duration;
let df = load_pump_data();
let schema = EtlSchema::new("wide_upsample_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.measurement_with_defaults("fuel", MeasurementKind::Measure)
.build()
.unwrap();
let universe = build_universe(&schema, df);
let request = EtlUnitSubsetRequest::new()
.measurements(vec!["sump".into(), "fuel".into()])
.interval(Interval::new(Duration::from_secs(10)));
let subset = universe
.subset(&request)
.expect("upsample wide subset failed");
assert!(subset.data.column("sump").is_ok());
assert!(subset.data.column("fuel").is_ok());
let mut wide_count = 0usize;
let mut wide_members: Vec<String> = Vec::new();
let mut narrow: Vec<String> = Vec::new();
for diag in &subset.info.stage_trace {
match &diag.stage {
SubsetStage::WideJoin { measurements, .. } => {
wide_count += 1;
wide_members.extend(measurements.clone());
}
SubsetStage::JoinMeasurement { measurement } => narrow.push(measurement.clone()),
_ => {}
}
}
assert_eq!(
wide_count, 1,
"upsample case should produce one wide_join, got {}",
wide_count
);
assert!(wide_members.iter().any(|m| m == "sump"));
assert!(wide_members.iter().any(|m| m == "fuel"));
assert!(
narrow.is_empty(),
"upsample wide path should fully replace per-measurement loop, found narrow: {:?}",
narrow
);
}
#[test]
#[ignore = "TODO(0.2): stage_trace shape changed — assertion looks for \
SubsetStage::JoinMeasurement which the subset pipeline no \
longer emits for this case. Update assertion when stage_trace API stabilises."]
fn test_wide_join_skips_single_member_request() {
use etl_unit::subset::stages::SubsetStage;
let df = load_pump_data();
let schema = EtlSchema::new("single_member_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.measurement_with_defaults("fuel", MeasurementKind::Measure)
.build()
.unwrap();
let universe = build_universe(&schema, df);
let request = EtlUnitSubsetRequest::new().measurements(vec!["sump".into()]);
let subset = universe.subset(&request).expect("Subset failed");
let has_wide = subset
.info
.stage_trace
.iter()
.any(|d| matches!(d.stage, SubsetStage::WideJoin { .. }));
let has_narrow_sump = subset.info.stage_trace.iter().any(|d| {
matches!(
&d.stage,
SubsetStage::JoinMeasurement { measurement } if measurement == "sump"
)
});
assert!(
!has_wide,
"single-member request should not take the wide path"
);
assert!(
has_narrow_sump,
"sump should be processed by the per-measurement loop"
);
}
#[test]
fn test_wide_join_fires_for_shared_source() {
use etl_unit::subset::stages::SubsetStage;
let df = load_pump_data();
let schema = EtlSchema::new("wide_join_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.measurement_with_defaults("fuel", MeasurementKind::Measure)
.build()
.unwrap();
let universe = build_universe(&schema, df);
let request = EtlUnitSubsetRequest::new().measurements(vec!["sump".into(), "fuel".into()]);
let subset = universe.subset(&request).expect("Subset failed");
assert!(subset.data.column("sump").is_ok());
assert!(subset.data.column("fuel").is_ok());
let mut wide_join_count = 0usize;
let mut wide_join_members: Vec<String> = Vec::new();
let mut narrow_join_members: Vec<String> = Vec::new();
for diag in &subset.info.stage_trace {
match &diag.stage {
SubsetStage::WideJoin { measurements, .. } => {
wide_join_count += 1;
wide_join_members.extend(measurements.iter().cloned());
}
SubsetStage::JoinMeasurement { measurement } => {
narrow_join_members.push(measurement.clone());
}
_ => {}
}
}
assert_eq!(
wide_join_count,
1,
"Expected exactly one wide_join stage, got {}. Stage trace: {:?}",
wide_join_count,
subset
.info
.stage_trace
.iter()
.map(|d| &d.stage)
.collect::<Vec<_>>()
);
assert!(
wide_join_members.contains(&"sump".to_string()),
"wide_join did not include sump (members: {:?})",
wide_join_members
);
assert!(
wide_join_members.contains(&"fuel".to_string()),
"wide_join did not include fuel (members: {:?})",
wide_join_members
);
assert!(
narrow_join_members.is_empty(),
"sump and fuel should have been wide-joined, not per-measurement. \
Found narrow joins for: {:?}",
narrow_join_members
);
}
#[test]
fn test_two_phase_execution() {
let df = load_pump_data();
let schema = EtlSchema::new("two_phase_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.measurement_with_defaults("fuel", MeasurementKind::Measure)
.build()
.unwrap();
let universe = build_universe(&schema, df);
assert_eq!(universe.schema().name, "two_phase_test");
assert!(universe.has_measurement("sump"));
assert!(universe.has_measurement("fuel"));
let subset1 = universe
.subset(&EtlUnitSubsetRequest::new().measurements(vec!["sump".into()]))
.expect("Subset 1 failed");
let subset2 = universe
.subset(&EtlUnitSubsetRequest::new().measurements(vec!["fuel".into()]))
.expect("Subset 2 failed");
let subset_both = universe
.subset(&EtlUnitSubsetRequest::new().measurements(vec!["sump".into(), "fuel".into()]))
.expect("Subset both failed");
assert_eq!(subset1.measurements.len(), 1);
assert_eq!(subset2.measurements.len(), 1);
assert_eq!(subset_both.measurements.len(), 2);
assert_eq!(subset1.data.height(), subset2.data.height());
assert_eq!(subset1.data.height(), subset_both.data.height());
}
#[test]
fn test_universe_build_info() {
let df = load_pump_data();
let schema = EtlSchema::new("build_info_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.measurement_with_defaults("engine_1", MeasurementKind::Categorical)
.with_derivation(Derivation::pointwise(
"any_engine",
PointwiseExpr::any_on(vec!["engine_1"]),
))
.build()
.unwrap();
let universe = build_universe(&schema, df);
let build_info = universe.build_info();
assert_eq!(build_info.schema_name, "build_info_test");
assert!(!build_info.sources_used.is_empty());
assert!(build_info.row_count > 0);
assert!(build_info.subject_count > 0);
assert!(build_info.build_duration.as_nanos() > 0);
}
use etl_unit::QualityFilter;
use etl_unit::universe::{QualityData, Universe};
fn build_quality_only_universe() -> Universe {
let schema = EtlSchema::new("quality_test")
.subject("station_id")
.time("observation_time")
.quality("region")
.build()
.unwrap();
let mut universe = Universe::from_schema(schema.clone());
let quality_df = df! {
"station_id" => ["Station_A", "Station_B", "Station_C"],
"region" => ["North", "South", "North"]
}
.unwrap();
let quality_unit = schema.get_quality("region").unwrap().clone();
universe
.qualities
.insert("region".into(), QualityData::new(quality_unit, quality_df));
universe
}
fn build_universe_with_quality() -> Universe {
let schema = EtlSchema::new("quality_test")
.subject("station_id")
.time("observation_time")
.measurement_with_defaults("sump", MeasurementKind::Measure)
.quality("region")
.build()
.unwrap();
let df = load_pump_data();
let mut universe = build_universe(&schema, df);
let quality_df = df! {
"station_id" => ["Station_A"],
"region" => ["North"]
}
.unwrap();
let quality_unit = schema.get_quality("region").unwrap().clone();
universe
.qualities
.insert("region".into(), QualityData::new(quality_unit, quality_df));
universe
}
#[test]
fn test_qualities_only_subset() {
let universe = build_quality_only_universe();
let request = EtlUnitSubsetRequest::new().qualities(vec!["region".into()]);
let subset = universe
.subset(&request)
.expect("Qualities-only subset failed");
assert!(!subset.has_measurements());
assert!(subset.has_qualities());
assert_eq!(subset.qualities.len(), 1);
assert_eq!(subset.qualities[0].column, "region".into());
assert!(subset.time_column().is_none());
let df = subset.dataframe();
assert!(df.column("station_id").is_ok());
assert!(df.column("region").is_ok());
assert_eq!(subset.info.subject_count, 3);
assert_eq!(df.height(), 3);
}
#[test]
fn test_quality_filter_with_measurements() {
let universe = build_universe_with_quality();
let request = EtlUnitSubsetRequest::new()
.measurements(vec!["sump".into()])
.qualities(vec!["region".into()])
.quality_filter(QualityFilter {
quality: "region".into(),
values: vec!["North".to_string()],
});
let subset = universe
.subset(&request)
.expect("Quality filter subset failed");
let df = subset.dataframe();
let region_col = df.column("region").unwrap();
for i in 0..df.height() {
if let Ok(polars::prelude::AnyValue::String(v)) = region_col.get(i) {
assert_eq!(
v, "North",
"Expected all regions to be North, got {} at row {}",
v, i
);
}
}
assert_eq!(subset.info.subject_count, 1);
}
#[test]
fn test_quality_filter_qualities_only() {
let universe = build_quality_only_universe();
let request = EtlUnitSubsetRequest::new()
.qualities(vec!["region".into()])
.quality_filter(QualityFilter {
quality: "region".into(),
values: vec!["South".to_string()],
});
let subset = universe
.subset(&request)
.expect("Quality filter subset failed");
assert!(!subset.has_measurements());
assert!(subset.time_column().is_none());
let df = subset.dataframe();
assert_eq!(df.height(), 1);
assert_eq!(subset.info.subject_count, 1);
}
#[test]
fn test_quality_filter_multiple_values() {
let universe = build_quality_only_universe();
let request = EtlUnitSubsetRequest::new()
.qualities(vec!["region".into()])
.quality_filter(QualityFilter {
quality: "region".into(),
values: vec!["North".to_string(), "South".to_string()],
});
let subset = universe
.subset(&request)
.expect("Quality filter subset failed");
assert_eq!(subset.info.subject_count, 3);
let request = EtlUnitSubsetRequest::new()
.qualities(vec!["region".into()])
.quality_filter(QualityFilter {
quality: "region".into(),
values: vec!["North".to_string()],
});
let subset = universe
.subset(&request)
.expect("Quality filter subset failed");
assert_eq!(subset.info.subject_count, 2);
}