use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::column::SourceColumnName;
use crate::request::TimeRange;
use crate::subject::SubjectValue;
use crate::unit::NullValue;
use crate::unit_ref::EtlUnitRef;
use super::source_context::{SourceContext, SourceKey};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct NullFill {
pub column: SourceColumnName,
pub value: NullValue,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceFilter {
pub source: Arc<SourceContext>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub time_range: Option<TimeRange>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub subject_set: Option<Vec<SubjectValue>>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub null_fills: Vec<NullFill>,
pub consumers: Vec<EtlUnitRef>,
}
impl SourceFilter {
pub fn from_context(
source: Arc<SourceContext>,
time_range: Option<TimeRange>,
subject_set: Option<Vec<SubjectValue>>,
) -> Self {
let null_fills = source
.members
.iter()
.filter_map(|m| {
m.value.source_null_fill.as_ref().map(|fill| NullFill {
column: m.value.physical.clone(),
value: fill.clone(),
})
})
.collect();
let consumers = source.members.iter().map(|m| m.unit.clone()).collect();
Self {
source,
time_range,
subject_set,
null_fills,
consumers,
}
}
pub fn source_key(&self) -> SourceKey {
self.source.source_key
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct FilterPlan {
pub filters: Vec<SourceFilter>,
}
impl FilterPlan {
pub fn empty() -> Self {
Self::default()
}
pub fn build(
sources: impl IntoIterator<Item = Arc<SourceContext>>,
time_range: Option<TimeRange>,
subject_set: Option<Vec<SubjectValue>>,
) -> Self {
let filters = sources
.into_iter()
.map(|src| SourceFilter::from_context(src, time_range.clone(), subject_set.clone()))
.collect();
Self { filters }
}
pub fn pass_count(&self) -> usize {
self.filters.len()
}
pub fn consumer_count(&self) -> usize {
self.filters.iter().map(|f| f.consumers.len()).sum()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::plan::bindings::{CodomainBinding, ColumnBinding};
use crate::plan::source_context::SourceMember;
use crate::universe::measurement_storage::DataSourceName;
fn make_scada() -> Arc<SourceContext> {
Arc::new(SourceContext {
source_name: DataSourceName::new("scada"),
source_key: SourceKey::from_raw(0xA1),
subject: ColumnBinding::new("station_id", "station_name"),
time: ColumnBinding::new("obs_time", "timestamp"),
components: vec![],
members: vec![
SourceMember::new(
EtlUnitRef::measurement("sump"),
CodomainBinding::new("sump_reading", "sump"),
),
SourceMember::new(
EtlUnitRef::measurement("discharge"),
CodomainBinding::new("discharge_reading", "discharge"),
),
SourceMember::new(
EtlUnitRef::measurement("engines_on_count"),
CodomainBinding::new("engine_count", "engines_on_count")
.with_source_null_fill(NullValue::Integer(0)),
),
],
})
}
fn make_mrms() -> Arc<SourceContext> {
Arc::new(SourceContext {
source_name: DataSourceName::new("mrms"),
source_key: SourceKey::from_raw(0xB2),
subject: ColumnBinding::new("station_name", "station_name"),
time: ColumnBinding::new("timestamp", "timestamp"),
components: vec![],
members: vec![SourceMember::new(
EtlUnitRef::measurement("historical_precip"),
CodomainBinding::new("value_mm", "historical_precip")
.with_source_null_fill(NullValue::Float(0.0)),
)],
})
}
#[test]
fn from_context_lists_all_members_as_consumers() {
let scada = make_scada();
let f = SourceFilter::from_context(scada, None, None);
assert_eq!(f.consumers.len(), 3);
assert!(f.consumers.iter().any(|u| u.as_str() == "sump"));
assert!(f.consumers.iter().any(|u| u.as_str() == "discharge"));
assert!(f.consumers.iter().any(|u| u.as_str() == "engines_on_count"));
}
#[test]
fn from_context_extracts_only_members_with_source_null_fill() {
let scada = make_scada();
let f = SourceFilter::from_context(scada, None, None);
assert_eq!(f.null_fills.len(), 1);
assert_eq!(f.null_fills[0].column.as_str(), "engine_count");
assert_eq!(f.null_fills[0].value, NullValue::Integer(0));
}
#[test]
fn from_context_no_null_fills_when_none_declared() {
let ctx = Arc::new(SourceContext {
source_name: DataSourceName::new("clean"),
source_key: SourceKey::from_raw(0xC3),
subject: ColumnBinding::identity("station"),
time: ColumnBinding::identity("time"),
components: vec![],
members: vec![SourceMember::new(
EtlUnitRef::measurement("v"),
CodomainBinding::new("v", "v"),
)],
});
let f = SourceFilter::from_context(ctx, None, None);
assert!(f.null_fills.is_empty());
}
#[test]
fn from_context_propagates_request_filters() {
let scada = make_scada();
let subjects = vec![SubjectValue::new("Coastal"), SubjectValue::new("Parr")];
let f = SourceFilter::from_context(scada, None, Some(subjects.clone()));
assert_eq!(f.subject_set.as_deref(), Some(subjects.as_slice()));
assert!(f.time_range.is_none());
}
#[test]
fn source_key_round_trips_through_filter() {
let scada = make_scada();
let key = scada.source_key;
let f = SourceFilter::from_context(scada, None, None);
assert_eq!(f.source_key(), key);
}
#[test]
fn build_creates_one_filter_per_source() {
let plan = FilterPlan::build([make_scada(), make_mrms()], None, None);
assert_eq!(plan.pass_count(), 2);
}
#[test]
fn build_consumer_count_sums_across_sources() {
let plan = FilterPlan::build([make_scada(), make_mrms()], None, None);
assert_eq!(plan.consumer_count(), 4);
}
#[test]
fn pass_count_vs_consumer_count_is_the_optimization_metric() {
let plan = FilterPlan::build([make_scada()], None, None);
assert_eq!(plan.pass_count(), 1);
assert_eq!(plan.consumer_count(), 3);
}
#[test]
fn empty_plan_has_zero_passes_and_consumers() {
let plan = FilterPlan::empty();
assert_eq!(plan.pass_count(), 0);
assert_eq!(plan.consumer_count(), 0);
}
#[test]
fn serde_roundtrip_filter_plan() {
let plan = FilterPlan::build([make_scada(), make_mrms()], None, None);
let json = serde_json::to_string(&plan).unwrap();
let back: FilterPlan = serde_json::from_str(&json).unwrap();
assert_eq!(back.pass_count(), 2);
assert_eq!(back.consumer_count(), 4);
}
#[test]
fn serde_skips_empty_optional_fields() {
let scada = make_scada();
let f = SourceFilter::from_context(scada, None, None);
let json = serde_json::to_string(&f).unwrap();
assert!(!json.contains("time_range"));
assert!(!json.contains("subject_set"));
assert!(json.contains("null_fills"));
}
}