use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use super::core::ExtractionCore;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum BuildSink {
Qualities {
path: PathBuf,
},
}
impl BuildSink {
pub fn qualities(path: impl Into<PathBuf>) -> Self {
Self::Qualities { path: path.into() }
}
pub fn is_qualities(&self) -> bool {
matches!(self, Self::Qualities { .. })
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuildPlan {
pub core: ExtractionCore,
pub sink: BuildSink,
}
impl BuildPlan {
pub fn new(core: ExtractionCore, sink: BuildSink) -> Self {
Self { core, sink }
}
pub fn qualities(core: ExtractionCore, path: impl Into<PathBuf>) -> Self {
Self::new(core, BuildSink::qualities(path))
}
pub fn source_count(&self) -> usize {
self.core.source_count()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use crate::aggregation::Aggregate;
use crate::plan::bindings::{CodomainBinding, ColumnBinding};
use crate::plan::crush::{Crush, CrushMember, CrushPlan};
use crate::plan::filter::{FilterPlan, SourceFilter};
use crate::plan::source_context::{SourceContext, SourceKey, SourceMember};
use crate::unit_ref::EtlUnitRef;
use crate::universe::measurement_storage::DataSourceName;
fn scada_with_qualities() -> Arc<SourceContext> {
Arc::new(SourceContext {
source_name: DataSourceName::new("scada"),
source_key: SourceKey::from_raw(0xA1),
subject: ColumnBinding::new("station_id", "station_name"),
time: ColumnBinding::new("obs_time", "timestamp"),
components: vec![],
members: vec![SourceMember::new(
EtlUnitRef::quality("station_label"),
CodomainBinding::new("display_name", "station_label"),
)],
})
}
#[test]
fn build_sink_qualities_constructor() {
let sink = BuildSink::qualities("/tmp/qualities.parquet");
assert!(sink.is_qualities());
match sink {
BuildSink::Qualities { path } => {
assert_eq!(path.to_str().unwrap(), "/tmp/qualities.parquet");
}
}
}
#[test]
fn qualities_constructor_assembles_full_plan() {
let src = scada_with_qualities();
let core = ExtractionCore::new(
FilterPlan {
filters: vec![SourceFilter::from_context(src.clone(), None, None)],
},
CrushPlan {
crushes: vec![Crush::time(
src,
vec![CrushMember::new(
EtlUnitRef::quality("station_label"),
CodomainBinding::new("display_name", "station_label"),
Aggregate::First,
)],
)],
},
);
let plan = BuildPlan::qualities(core, "/store/scada/qualities.parquet");
assert_eq!(plan.source_count(), 1);
assert_eq!(plan.core.crush.time_count(), 1);
assert!(plan.sink.is_qualities());
}
#[test]
fn shared_core_works_for_both_plan_types() {
use crate::plan::join::JoinPlan;
use crate::plan::runtime::{DerivationPlan, RuntimePlan};
let src = scada_with_qualities();
let core = ExtractionCore::new(
FilterPlan {
filters: vec![SourceFilter::from_context(src, None, None)],
},
CrushPlan::empty(),
);
let runtime = RuntimePlan::new(core.clone(), JoinPlan::empty(), DerivationPlan::empty());
let build = BuildPlan::qualities(core, "/q.parquet");
assert_eq!(runtime.source_count(), 1);
assert_eq!(build.source_count(), 1);
}
#[test]
fn serde_roundtrip_build_plan() {
let src = scada_with_qualities();
let plan = BuildPlan::qualities(
ExtractionCore::new(
FilterPlan {
filters: vec![SourceFilter::from_context(src, None, None)],
},
CrushPlan::empty(),
),
"/q.parquet",
);
let json = serde_json::to_string(&plan).unwrap();
assert!(json.contains("\"kind\":\"qualities\""));
assert!(json.contains("/q.parquet"));
let back: BuildPlan = serde_json::from_str(&json).unwrap();
assert!(back.sink.is_qualities());
assert_eq!(back.source_count(), 1);
}
}