use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use super::{aggregation::AggregationSpec, interval::Interval};
use crate::{
CanonicalColumnName, aggregation::SyntheticSubject, component_filter::ComponentFilters,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityFilter {
pub quality: CanonicalColumnName,
pub values: Vec<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EtlUnitSubsetRequest {
#[serde(default)]
pub measurements: Vec<CanonicalColumnName>,
#[serde(default)]
pub qualities: Vec<CanonicalColumnName>,
#[serde(skip_serializing_if = "Option::is_none")]
pub subject_filter: Option<SubjectFilter>,
#[serde(skip_serializing_if = "Option::is_none")]
pub quality_filter: Option<QualityFilter>,
#[serde(skip_serializing_if = "Option::is_none")]
pub time_range: Option<TimeRange>,
#[serde(skip_serializing_if = "Option::is_none")]
pub component_filters: Option<ComponentFilters>,
#[deprecated(since = "0.1.0", note = "Please set at the measurement level")]
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregation_override: Option<AggregationSpec>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub synthetic_subjects: Vec<SyntheticSubject>,
#[serde(skip_serializing_if = "Option::is_none")]
pub interval: Option<Interval>,
#[serde(skip_serializing_if = "Option::is_none")]
pub report_interval: Option<crate::interval::ReportInterval>,
#[serde(skip_serializing_if = "Option::is_none")]
pub group_by: Option<crate::group::GroupBy>,
}
impl EtlUnitSubsetRequest {
pub fn new() -> Self {
Self::default()
}
pub fn measurements(mut self, names: Vec<CanonicalColumnName>) -> Self {
self.measurements = names;
self
}
pub fn qualities(mut self, names: Vec<CanonicalColumnName>) -> Self {
self.qualities = names;
self
}
pub fn subject_filter(mut self, filter: SubjectFilter) -> Self {
self.subject_filter = Some(filter);
self
}
pub fn subjects(mut self, subjects: Vec<String>) -> Self {
self.subject_filter = Some(SubjectFilter::include_strings(subjects));
self
}
pub fn quality_filter(mut self, filter: QualityFilter) -> Self {
self.quality_filter = Some(filter);
self
}
pub fn time_range(mut self, range: TimeRange) -> Self {
self.time_range = Some(range);
self
}
pub fn with_component_filters(mut self, filters: ComponentFilters) -> Self {
self.component_filters = Some(filters);
self
}
pub fn include_component(mut self, component: impl Into<String>) -> Self {
let filters = self
.component_filters
.get_or_insert_with(ComponentFilters::new);
*filters = std::mem::take(filters).include_all(component);
self
}
pub fn filter_component(
mut self,
component: impl Into<String>,
levels: Vec<serde_json::Value>,
) -> Self {
let filters = self
.component_filters
.get_or_insert_with(ComponentFilters::new);
*filters = std::mem::take(filters).include_levels(component, levels);
self
}
pub fn exclude_component_levels(
mut self,
component: impl Into<String>,
levels: Vec<serde_json::Value>,
) -> Self {
let filters = self
.component_filters
.get_or_insert_with(ComponentFilters::new);
*filters = std::mem::take(filters).exclude_levels(component, levels);
self
}
pub fn has_component_filters(&self) -> bool {
self.component_filters
.as_ref()
.map(|f| !f.is_empty())
.unwrap_or(false)
}
#[deprecated(note = "Set aggregation at the measurement level instead.")]
#[allow(deprecated)]
pub fn aggregation_override(mut self, spec: AggregationSpec) -> Self {
self.aggregation_override = Some(spec);
self
}
pub fn with_synthetic_subject(mut self, synthetic: SyntheticSubject) -> Self {
self.synthetic_subjects.push(synthetic);
self
}
pub fn has_synthetic_subjects(&self) -> bool {
!self.synthetic_subjects.is_empty()
}
pub fn interval(mut self, interval: Interval) -> Self {
self.interval = Some(interval);
self
}
pub fn interval_str(mut self, s: &str) -> Result<Self, crate::EtlError> {
let interval = Interval::parse(s).map_err(|e| {
crate::EtlError::Config(format!(
"Invalid interval '{}': {}. Use formats like '15m', '1h', '6h', '1d'",
s, e
))
})?;
self.interval = Some(interval);
Ok(self)
}
pub fn has_interval(&self) -> bool {
self.interval.is_some()
}
pub fn report_interval(mut self, interval: crate::interval::ReportInterval) -> Self {
self.report_interval = Some(interval);
self
}
pub fn has_report_interval(&self) -> bool {
self.report_interval.is_some()
}
pub fn group_by(mut self, group_by: crate::group::GroupBy) -> Self {
self.group_by = Some(group_by);
self
}
pub fn has_group_by(&self) -> bool {
self.group_by.is_some()
}
pub fn has_components_to_crush(&self) -> bool {
self.component_filters.as_ref().is_none_or(|f| f.is_empty())
}
pub fn needs_aggregation(&self) -> bool {
self.interval.is_some() || self.has_components_to_crush()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "values")]
pub enum SubjectFilter {
Include(Vec<serde_json::Value>),
Exclude(Vec<serde_json::Value>),
}
impl SubjectFilter {
pub fn include(values: Vec<serde_json::Value>) -> Self {
Self::Include(values)
}
pub fn exclude(values: Vec<serde_json::Value>) -> Self {
Self::Exclude(values)
}
pub fn include_one(value: impl Into<String>) -> Self {
Self::Include(vec![serde_json::Value::String(value.into())])
}
pub fn include_strings(values: Vec<String>) -> Self {
Self::Include(values.into_iter().map(serde_json::Value::String).collect())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeRange {
#[serde(skip_serializing_if = "Option::is_none")]
pub start: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub end: Option<DateTime<Utc>>,
}
impl TimeRange {
pub fn new(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> Self {
Self { start, end }
}
pub fn from(start: DateTime<Utc>) -> Self {
Self {
start: Some(start),
end: None,
}
}
pub fn until(end: DateTime<Utc>) -> Self {
Self {
start: None,
end: Some(end),
}
}
pub fn between(start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
Self {
start: Some(start),
end: Some(end),
}
}
pub fn last_hours(hours: i64) -> Self {
let end = Utc::now();
let start = end - chrono::Duration::hours(hours);
Self::between(start, end)
}
pub fn last_days(days: i64) -> Self {
let end = Utc::now();
let start = end - chrono::Duration::days(days);
Self::between(start, end)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
aggregation::Aggregate, component_filter::LevelFilter,
request::aggregation::AggregationType,
};
#[test]
fn test_basic_request() {
let req = EtlUnitSubsetRequest::new()
.measurements(vec!["sump_ft".into(), "fuel_pct".into()])
.qualities(vec!["station_name".into()])
.time_range(TimeRange::last_hours(24));
assert_eq!(req.measurements.len(), 2);
assert_eq!(req.qualities.len(), 1);
assert!(req.time_range.is_some());
}
#[test]
fn test_subject_filter() {
let filter = SubjectFilter::include_strings(vec!["station_1".into(), "station_2".into()]);
if let SubjectFilter::Include(values) = filter {
assert_eq!(values.len(), 2);
} else {
panic!("Expected Include variant");
}
}
#[test]
fn test_component_filters_include_all() {
let req = EtlUnitSubsetRequest::new()
.measurements(vec!["value".into()])
.include_component("color")
.include_component("size");
assert!(req.has_component_filters());
let filters = req.component_filters.unwrap();
assert_eq!(filters.len(), 2);
assert!(filters.includes("color"));
assert!(filters.includes("size"));
assert!(filters.get("color").unwrap().includes_all());
}
#[test]
fn test_component_filters_include_levels() {
let req = EtlUnitSubsetRequest::new()
.measurements(vec!["value".into()])
.filter_component("color", vec!["red".into(), "blue".into()]);
assert!(req.has_component_filters());
let filters = req.component_filters.unwrap();
assert!(filters.includes("color"));
let level_filter = filters.get("color").unwrap();
assert!(!level_filter.includes_all());
assert!(matches!(level_filter.levels, LevelFilter::Include(_)));
}
#[test]
fn test_component_filters_exclude_levels() {
let req = EtlUnitSubsetRequest::new()
.measurements(vec!["value".into()])
.exclude_component_levels("color", vec!["unknown".into()]);
assert!(req.has_component_filters());
let filters = req.component_filters.unwrap();
assert!(filters.includes("color"));
let level_filter = filters.get("color").unwrap();
assert!(matches!(level_filter.levels, LevelFilter::Exclude(_)));
}
#[test]
fn test_component_filters_with_builder() {
let req = EtlUnitSubsetRequest::new()
.measurements(vec!["value".into()])
.with_component_filters(
ComponentFilters::new()
.include_all("color")
.include_levels("size", vec!["small".into(), "medium".into()])
.exclude_levels("region", vec!["unknown".into()]),
);
assert!(req.has_component_filters());
let filters = req.component_filters.unwrap();
assert_eq!(filters.len(), 3);
}
#[test]
fn test_no_component_filters_means_crush_all() {
let req = EtlUnitSubsetRequest::new().measurements(vec!["value".into()]);
assert!(!req.has_component_filters());
assert!(req.component_filters.is_none());
}
#[test]
fn test_time_range_last_hours() {
let range = TimeRange::last_hours(24);
assert!(range.start.is_some());
assert!(range.end.is_some());
let duration = range.end.unwrap() - range.start.unwrap();
assert_eq!(duration.num_hours(), 24);
}
#[test]
fn test_aggregation_override() {
let spec = AggregationSpec::new()
.set("units_sold".into(), AggregationType::Sum)
.set("price".into(), AggregationType::Mean);
assert_eq!(spec.overrides.len(), 2);
assert_eq!(
spec.overrides.get(&("units_sold".into())),
Some(&AggregationType::Sum)
);
}
#[test]
fn test_request_with_synthetic_subject() {
let req = EtlUnitSubsetRequest::new()
.measurements(vec!["fuel".into(), "sump".into()])
.with_synthetic_subject(SyntheticSubject::mean_all("Fleet Average"))
.with_synthetic_subject(
SyntheticSubject::new("Custom")
.rule("fuel", Aggregate::Mean)
.rule("sump", Aggregate::Max),
);
assert!(req.has_synthetic_subjects());
assert_eq!(req.synthetic_subjects.len(), 2);
}
#[test]
fn test_request_with_grouped_synthetic() {
let req = EtlUnitSubsetRequest::new()
.measurements(vec!["fuel".into()])
.with_synthetic_subject(SyntheticSubject::mean_all("{zone} Average").group_by("zone"));
assert_eq!(req.synthetic_subjects.len(), 1);
assert!(req.synthetic_subjects[0].is_grouped());
}
#[test]
fn test_subjects_shorthand() {
let req =
EtlUnitSubsetRequest::new().subjects(vec!["station_A".into(), "station_B".into()]);
assert!(req.subject_filter.is_some());
if let Some(SubjectFilter::Include(values)) = req.subject_filter {
assert_eq!(values.len(), 2);
} else {
panic!("Expected Include variant");
}
}
#[test]
fn test_chained_component_filters() {
let req = EtlUnitSubsetRequest::new()
.measurements(vec!["value".into()])
.include_component("color")
.filter_component("size", vec!["small".into()])
.exclude_component_levels("region", vec!["unknown".into()]);
let filters = req.component_filters.unwrap();
assert_eq!(filters.len(), 3);
assert!(filters.get("color").unwrap().includes_all());
assert!(matches!(
filters.get("size").unwrap().levels,
LevelFilter::Include(_)
));
assert!(matches!(
filters.get("region").unwrap().levels,
LevelFilter::Exclude(_)
));
}
}