use serde::{Deserialize, Serialize};
use super::build::BuildPlan;
use super::core::ExtractionCore;
use super::runtime::RuntimePlan;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum ProcessingPlan {
Runtime(RuntimePlan),
Build(BuildPlan),
}
impl ProcessingPlan {
pub fn as_runtime(&self) -> Option<&RuntimePlan> {
if let Self::Runtime(p) = self {
Some(p)
} else {
None
}
}
pub fn as_build(&self) -> Option<&BuildPlan> {
if let Self::Build(p) = self {
Some(p)
} else {
None
}
}
pub fn core(&self) -> &ExtractionCore {
match self {
Self::Runtime(p) => &p.core,
Self::Build(p) => &p.core,
}
}
pub fn source_count(&self) -> usize {
self.core().source_count()
}
pub fn mode(&self) -> &'static str {
match self {
Self::Runtime(_) => "runtime",
Self::Build(_) => "build",
}
}
}
impl From<RuntimePlan> for ProcessingPlan {
fn from(plan: RuntimePlan) -> Self {
Self::Runtime(plan)
}
}
impl From<BuildPlan> for ProcessingPlan {
fn from(plan: BuildPlan) -> Self {
Self::Build(plan)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use crate::aggregation::Aggregate;
use crate::plan::bindings::{CodomainBinding, ColumnBinding};
use crate::plan::build::BuildSink;
use crate::plan::crush::{Crush, CrushMember, CrushPlan};
use crate::plan::filter::{FilterPlan, SourceFilter};
use crate::plan::join::JoinPlan;
use crate::plan::runtime::DerivationPlan;
use crate::plan::source_context::{SourceContext, SourceKey, SourceMember};
use crate::unit_ref::EtlUnitRef;
use crate::universe::measurement_storage::DataSourceName;
fn make_source() -> Arc<SourceContext> {
Arc::new(SourceContext {
source_name: DataSourceName::new("scada"),
source_key: SourceKey::from_raw(0xA1),
subject: ColumnBinding::new("station_id", "station_name"),
time: ColumnBinding::new("obs_time", "timestamp"),
components: vec![],
members: vec![SourceMember::new(
EtlUnitRef::measurement("sump"),
CodomainBinding::new("sump_reading", "sump"),
)],
})
}
fn make_runtime_plan() -> RuntimePlan {
let src = make_source();
RuntimePlan::new(
ExtractionCore::new(
FilterPlan {
filters: vec![SourceFilter::from_context(src, None, None)],
},
CrushPlan::empty(),
),
JoinPlan::empty(),
DerivationPlan::empty(),
)
}
fn make_build_plan() -> BuildPlan {
let src = make_source();
BuildPlan::qualities(
ExtractionCore::new(
FilterPlan {
filters: vec![SourceFilter::from_context(src.clone(), None, None)],
},
CrushPlan {
crushes: vec![Crush::time(
src,
vec![CrushMember::new(
EtlUnitRef::quality("station_label"),
CodomainBinding::new("display_name", "station_label"),
Aggregate::First,
)],
)],
},
),
"/tmp/qualities.parquet",
)
}
#[test]
fn from_runtime_plan_via_into() {
let rt = make_runtime_plan();
let plan: ProcessingPlan = rt.into();
assert!(plan.as_runtime().is_some());
assert!(plan.as_build().is_none());
assert_eq!(plan.mode(), "runtime");
}
#[test]
fn from_build_plan_via_into() {
let bp = make_build_plan();
let plan: ProcessingPlan = bp.into();
assert!(plan.as_build().is_some());
assert!(plan.as_runtime().is_none());
assert_eq!(plan.mode(), "build");
}
#[test]
fn shared_core_accessor_works_for_both_variants() {
let runtime: ProcessingPlan = make_runtime_plan().into();
let build: ProcessingPlan = make_build_plan().into();
assert_eq!(runtime.core().source_count(), 1);
assert_eq!(build.core().source_count(), 1);
assert_eq!(runtime.source_count(), 1);
assert_eq!(build.source_count(), 1);
}
#[test]
fn serde_roundtrip_runtime_variant() {
let plan: ProcessingPlan = make_runtime_plan().into();
let json = serde_json::to_string(&plan).unwrap();
assert!(json.contains("\"mode\":\"runtime\""));
let back: ProcessingPlan = serde_json::from_str(&json).unwrap();
assert!(back.as_runtime().is_some());
}
#[test]
fn serde_roundtrip_build_variant() {
let plan: ProcessingPlan = make_build_plan().into();
let json = serde_json::to_string(&plan).unwrap();
assert!(json.contains("\"mode\":\"build\""));
let back: ProcessingPlan = serde_json::from_str(&json).unwrap();
assert!(back.as_build().is_some());
}
#[test]
fn standalone_types_remain_usable() {
let rt = make_runtime_plan();
fn takes_runtime(p: &RuntimePlan) -> usize {
p.source_count()
}
assert_eq!(takes_runtime(&rt), 1);
let bp = make_build_plan();
fn takes_build(p: &BuildPlan) -> usize {
p.source_count()
}
assert_eq!(takes_build(&bp), 1);
}
}