use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::aggregation::Aggregate;
use crate::unit_ref::EtlUnitRef;
use super::bindings::CodomainBinding;
use super::source_context::{SourceContext, SourceKey};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrushMember {
pub unit: EtlUnitRef,
pub value: CodomainBinding,
pub aggregation: Aggregate,
}
impl CrushMember {
pub fn new(unit: EtlUnitRef, value: CodomainBinding, aggregation: Aggregate) -> Self {
Self {
unit,
value,
aggregation,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Crush {
Components {
source: Arc<SourceContext>,
members: Vec<CrushMember>,
},
Time {
source: Arc<SourceContext>,
members: Vec<CrushMember>,
},
}
impl Crush {
pub fn components(source: Arc<SourceContext>, members: Vec<CrushMember>) -> Self {
Self::Components { source, members }
}
pub fn time(source: Arc<SourceContext>, members: Vec<CrushMember>) -> Self {
Self::Time { source, members }
}
pub fn source(&self) -> &Arc<SourceContext> {
match self {
Self::Components { source, .. } | Self::Time { source, .. } => source,
}
}
pub fn source_key(&self) -> SourceKey {
self.source().source_key
}
pub fn members(&self) -> &[CrushMember] {
match self {
Self::Components { members, .. } | Self::Time { members, .. } => members,
}
}
pub fn is_components(&self) -> bool {
matches!(self, Self::Components { .. })
}
pub fn is_time(&self) -> bool {
matches!(self, Self::Time { .. })
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CrushPlan {
pub crushes: Vec<Crush>,
}
impl CrushPlan {
pub fn empty() -> Self {
Self::default()
}
pub fn op_count(&self) -> usize {
self.crushes.len()
}
pub fn components_count(&self) -> usize {
self.crushes.iter().filter(|c| c.is_components()).count()
}
pub fn time_count(&self) -> usize {
self.crushes.iter().filter(|c| c.is_time()).count()
}
pub fn member_count(&self) -> usize {
self.crushes.iter().map(|c| c.members().len()).sum()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::plan::bindings::{CodomainBinding, ColumnBinding};
use crate::plan::source_context::SourceMember;
use crate::universe::measurement_storage::DataSourceName;
fn make_scada_engines_unpivoted() -> Arc<SourceContext> {
Arc::new(SourceContext {
source_name: DataSourceName::new("scada_engines"),
source_key: SourceKey::from_raw(0xE9),
subject: ColumnBinding::identity("station_name"),
time: ColumnBinding::identity("timestamp"),
components: vec![ColumnBinding::identity("engine_number")],
members: vec![SourceMember::new(
EtlUnitRef::measurement("engines_on_count"),
CodomainBinding::new("engine_on", "engines_on_count"),
)],
})
}
fn make_quality_source() -> Arc<SourceContext> {
Arc::new(SourceContext {
source_name: DataSourceName::new("scada"),
source_key: SourceKey::from_raw(0xA1),
subject: ColumnBinding::new("station_id", "station_name"),
time: ColumnBinding::new("obs_time", "timestamp"),
components: vec![],
members: vec![SourceMember::new(
EtlUnitRef::quality("station_label"),
CodomainBinding::new("display_name", "station_label"),
)],
})
}
#[test]
fn components_constructor() {
let src = make_scada_engines_unpivoted();
let key = src.source_key;
let crush = Crush::components(
src,
vec![CrushMember::new(
EtlUnitRef::measurement("engines_on_count"),
CodomainBinding::new("engine_on", "engines_on_count"),
Aggregate::Sum,
)],
);
assert!(crush.is_components());
assert!(!crush.is_time());
assert_eq!(crush.source_key(), key);
assert_eq!(crush.members().len(), 1);
assert_eq!(crush.members()[0].aggregation, Aggregate::Sum);
}
#[test]
fn time_constructor() {
let src = make_quality_source();
let key = src.source_key;
let crush = Crush::time(
src,
vec![CrushMember::new(
EtlUnitRef::quality("station_label"),
CodomainBinding::new("display_name", "station_label"),
Aggregate::First,
)],
);
assert!(crush.is_time());
assert!(!crush.is_components());
assert_eq!(crush.source_key(), key);
}
#[test]
fn empty_plan_counts_are_zero() {
let plan = CrushPlan::empty();
assert_eq!(plan.op_count(), 0);
assert_eq!(plan.components_count(), 0);
assert_eq!(plan.time_count(), 0);
assert_eq!(plan.member_count(), 0);
}
#[test]
fn mixed_plan_counts_are_correct() {
let plan = CrushPlan {
crushes: vec![
Crush::components(
make_scada_engines_unpivoted(),
vec![CrushMember::new(
EtlUnitRef::measurement("engines_on_count"),
CodomainBinding::new("engine_on", "engines_on_count"),
Aggregate::Sum,
)],
),
Crush::time(
make_quality_source(),
vec![CrushMember::new(
EtlUnitRef::quality("station_label"),
CodomainBinding::new("display_name", "station_label"),
Aggregate::First,
)],
),
],
};
assert_eq!(plan.op_count(), 2);
assert_eq!(plan.components_count(), 1);
assert_eq!(plan.time_count(), 1);
assert_eq!(plan.member_count(), 2);
}
#[test]
fn serde_roundtrip_components_variant() {
let crush = Crush::components(
make_scada_engines_unpivoted(),
vec![CrushMember::new(
EtlUnitRef::measurement("engines_on_count"),
CodomainBinding::new("engine_on", "engines_on_count"),
Aggregate::Sum,
)],
);
let json = serde_json::to_string(&crush).unwrap();
assert!(json.contains("\"kind\":\"components\""));
let back: Crush = serde_json::from_str(&json).unwrap();
assert!(back.is_components());
}
#[test]
fn serde_roundtrip_time_variant() {
let crush = Crush::time(
make_quality_source(),
vec![CrushMember::new(
EtlUnitRef::quality("station_label"),
CodomainBinding::new("display_name", "station_label"),
Aggregate::First,
)],
);
let json = serde_json::to_string(&crush).unwrap();
assert!(json.contains("\"kind\":\"time\""));
let back: Crush = serde_json::from_str(&json).unwrap();
assert!(back.is_time());
}
}