use std::sync::Arc;
use serde::{Deserialize, Serialize};
use super::crush::CrushPlan;
use super::filter::FilterPlan;
use super::source_context::{SourceContext, SourceKey};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ExtractionCore {
pub filter: FilterPlan,
pub crush: CrushPlan,
}
impl ExtractionCore {
pub fn new(filter: FilterPlan, crush: CrushPlan) -> Self {
Self { filter, crush }
}
pub fn empty() -> Self {
Self::default()
}
pub fn sources(&self) -> impl Iterator<Item = &Arc<SourceContext>> {
self.filter.filters.iter().map(|f| &f.source)
}
pub fn source_count(&self) -> usize {
self.filter.filters.len()
}
pub fn lookup_source(&self, key: SourceKey) -> Option<&Arc<SourceContext>> {
self.sources().find(|s| s.source_key == key)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::aggregation::Aggregate;
use crate::plan::bindings::{CodomainBinding, ColumnBinding};
use crate::plan::crush::{Crush, CrushMember};
use crate::plan::filter::SourceFilter;
use crate::plan::source_context::SourceMember;
use crate::unit_ref::EtlUnitRef;
use crate::universe::measurement_storage::DataSourceName;
fn scada() -> Arc<SourceContext> {
Arc::new(SourceContext {
source_name: DataSourceName::new("scada"),
source_key: SourceKey::from_raw(0xA1),
subject: ColumnBinding::new("station_id", "station_name"),
time: ColumnBinding::new("obs_time", "timestamp"),
components: vec![],
members: vec![SourceMember::new(
EtlUnitRef::measurement("sump"),
CodomainBinding::new("sump_reading", "sump"),
)],
})
}
fn engines_unpivoted() -> Arc<SourceContext> {
Arc::new(SourceContext {
source_name: DataSourceName::new("scada_engines"),
source_key: SourceKey::from_raw(0xE9),
subject: ColumnBinding::identity("station_name"),
time: ColumnBinding::identity("timestamp"),
components: vec![ColumnBinding::identity("engine_number")],
members: vec![SourceMember::new(
EtlUnitRef::measurement("engines_on_count"),
CodomainBinding::new("engine_on", "engines_on_count"),
)],
})
}
#[test]
fn empty_core_has_zero_sources() {
let core = ExtractionCore::empty();
assert_eq!(core.source_count(), 0);
assert!(core.sources().next().is_none());
}
#[test]
fn sources_iter_walks_the_filter_plan_in_order() {
let s1 = scada();
let s2 = engines_unpivoted();
let filter = FilterPlan {
filters: vec![
SourceFilter::from_context(s1.clone(), None, None),
SourceFilter::from_context(s2.clone(), None, None),
],
};
let core = ExtractionCore::new(filter, CrushPlan::empty());
assert_eq!(core.source_count(), 2);
let keys: Vec<SourceKey> = core.sources().map(|s| s.source_key).collect();
assert_eq!(keys, vec![s1.source_key, s2.source_key]);
}
#[test]
fn lookup_source_finds_by_key() {
let s1 = scada();
let s2 = engines_unpivoted();
let key2 = s2.source_key;
let filter = FilterPlan {
filters: vec![
SourceFilter::from_context(s1, None, None),
SourceFilter::from_context(s2, None, None),
],
};
let core = ExtractionCore::new(filter, CrushPlan::empty());
let found = core
.lookup_source(key2)
.expect("should find engines source");
assert_eq!(found.source_key, key2);
assert_eq!(found.source_name.as_str(), "scada_engines");
}
#[test]
fn lookup_source_returns_none_for_unknown_key() {
let core = ExtractionCore::new(
FilterPlan {
filters: vec![SourceFilter::from_context(scada(), None, None)],
},
CrushPlan::empty(),
);
assert!(core.lookup_source(SourceKey::from_raw(0xDEAD)).is_none());
}
#[test]
fn core_carries_crush_alongside_filter() {
let engines = engines_unpivoted();
let filter = FilterPlan {
filters: vec![SourceFilter::from_context(engines.clone(), None, None)],
};
let crush = CrushPlan {
crushes: vec![Crush::components(
engines,
vec![CrushMember::new(
EtlUnitRef::measurement("engines_on_count"),
CodomainBinding::new("engine_on", "engines_on_count"),
Aggregate::Sum,
)],
)],
};
let core = ExtractionCore::new(filter, crush);
assert_eq!(core.source_count(), 1);
assert_eq!(core.crush.op_count(), 1);
assert_eq!(core.crush.components_count(), 1);
}
#[test]
fn serde_roundtrip_extraction_core() {
let core = ExtractionCore::new(
FilterPlan {
filters: vec![SourceFilter::from_context(scada(), None, None)],
},
CrushPlan::empty(),
);
let json = serde_json::to_string(&core).unwrap();
let back: ExtractionCore = serde_json::from_str(&json).unwrap();
assert_eq!(back.source_count(), 1);
}
}