use serde::{Deserialize, Serialize};
use crate::unit_ref::EtlUnitRef;
use super::core::ExtractionCore;
use super::join::JoinPlan;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DerivationPlan {
pub derivations: Vec<EtlUnitRef>,
}
impl DerivationPlan {
pub fn empty() -> Self {
Self::default()
}
pub fn from_derivations(derivations: Vec<EtlUnitRef>) -> Self {
Self { derivations }
}
pub fn len(&self) -> usize {
self.derivations.len()
}
pub fn is_empty(&self) -> bool {
self.derivations.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimePlan {
pub core: ExtractionCore,
pub join: JoinPlan,
pub derivations: DerivationPlan,
}
impl RuntimePlan {
pub fn new(core: ExtractionCore, join: JoinPlan, derivations: DerivationPlan) -> Self {
Self {
core,
join,
derivations,
}
}
pub fn empty() -> Self {
Self {
core: ExtractionCore::empty(),
join: JoinPlan::empty(),
derivations: DerivationPlan::empty(),
}
}
pub fn source_count(&self) -> usize {
self.core.source_count()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use crate::plan::bindings::{CodomainBinding, ColumnBinding};
use crate::plan::crush::CrushPlan;
use crate::plan::filter::{FilterPlan, SourceFilter};
use crate::plan::join::{JoinColumn, JoinKeys, SourceJoin};
use crate::plan::source_context::{SourceContext, SourceKey, SourceMember};
use crate::universe::measurement_storage::DataSourceName;
fn scada() -> Arc<SourceContext> {
Arc::new(SourceContext {
source_name: DataSourceName::new("scada"),
source_key: SourceKey::from_raw(0xA1),
subject: ColumnBinding::new("station_id", "station_name"),
time: ColumnBinding::new("obs_time", "timestamp"),
components: vec![],
members: vec![SourceMember::new(
EtlUnitRef::measurement("sump"),
CodomainBinding::new("sump_reading", "sump"),
)],
})
}
#[test]
fn empty_runtime_plan_is_default_everywhere() {
let plan = RuntimePlan::empty();
assert_eq!(plan.source_count(), 0);
assert_eq!(plan.join.op_count(), 0);
assert!(plan.derivations.is_empty());
}
#[test]
fn derivation_plan_construction() {
let dp = DerivationPlan::from_derivations(vec![
EtlUnitRef::derivation("total_runtime"),
EtlUnitRef::derivation("avg_load"),
]);
assert_eq!(dp.len(), 2);
assert!(!dp.is_empty());
}
#[test]
fn runtime_plan_composes_core_and_runtime_stages() {
let scada = scada();
let core = ExtractionCore::new(
FilterPlan {
filters: vec![SourceFilter::from_context(scada.clone(), None, None)],
},
CrushPlan::empty(),
);
let join = JoinPlan {
joins: vec![SourceJoin::new(
scada,
JoinKeys::SubjectTime,
crate::plan::join::GroupSignalConfig::new(None, None),
vec![JoinColumn::new(
EtlUnitRef::measurement("sump"),
CodomainBinding::new("sump_reading", "sump"),
)],
)],
};
let derivations = DerivationPlan::from_derivations(vec![EtlUnitRef::derivation("d1")]);
let plan = RuntimePlan::new(core, join, derivations);
assert_eq!(plan.source_count(), 1);
assert_eq!(plan.join.op_count(), 1);
assert_eq!(plan.derivations.len(), 1);
}
#[test]
fn serde_roundtrip_runtime_plan() {
let scada = scada();
let plan = RuntimePlan::new(
ExtractionCore::new(
FilterPlan {
filters: vec![SourceFilter::from_context(scada, None, None)],
},
CrushPlan::empty(),
),
JoinPlan::empty(),
DerivationPlan::empty(),
);
let json = serde_json::to_string(&plan).unwrap();
let back: RuntimePlan = serde_json::from_str(&json).unwrap();
assert_eq!(back.source_count(), 1);
assert_eq!(back.join.op_count(), 0);
assert!(back.derivations.is_empty());
}
}