use super::{
EtlSchema,
dto::{
ChartHintsDto, ChartTypeDto, DerivationDto, IndexDto, MeasurementDto, QualityDto,
SchemaDto, SeriesDto, SignalPolicyDto, WindowStrategyDto,
},
};
use crate::{
Derivation, EtlResult, MeasurementUnit, QualityUnit, SignalPolicy,
chart_hints::{AxisId, ChartHints, ChartSeries, ChartType, Index},
column::CanonicalColumnName,
signal_policy::WindowStrategy,
};
impl SchemaDto {
pub fn into_schema(self) -> EtlResult<EtlSchema> {
let subject = CanonicalColumnName::new(&self.subject);
let time = CanonicalColumnName::new(&self.time);
let qualities: Vec<QualityUnit> = self
.qualities
.into_iter()
.map(|(name, dto)| convert_quality(&subject, &name, dto))
.collect();
let measurements: Vec<MeasurementUnit> = self
.measurements
.into_iter()
.map(|(name, dto)| convert_measurement(&subject, &time, &name, dto))
.collect::<EtlResult<Vec<_>>>()?;
let derivations: Vec<Derivation> = self
.derivations
.into_iter()
.map(|(name, dto)| convert_derivation(&name, dto))
.collect();
let schema = EtlSchema {
name: self.name,
subject,
time,
qualities,
measurements,
derivations,
quality_derivations: Vec::new(),
};
Ok(schema)
}
}
fn convert_quality(subject: &CanonicalColumnName, name: &str, dto: QualityDto) -> QualityUnit {
let mut quality = QualityUnit::new(subject.as_str(), name);
if let Some(hints_dto) = dto.chart_hints {
quality.chart_hints = Some(convert_chart_hints(hints_dto));
}
quality
}
fn convert_measurement(
subject: &CanonicalColumnName,
time: &CanonicalColumnName,
name: &str,
dto: MeasurementDto,
) -> EtlResult<MeasurementUnit> {
let mut measurement = MeasurementUnit::new(
subject.clone(),
time.clone(),
CanonicalColumnName::new(name),
dto.kind,
);
for component in dto.components {
measurement
.components
.push(CanonicalColumnName::new(component));
}
if let Some(policy_dto) = dto.signal_policy {
measurement.signal_policy = Some(convert_signal_policy(policy_dto)?);
}
if let Some(rate) = dto.sample_rate {
measurement.sample_rate_ms = Some(rate.as_millis() as i64);
}
if let Some(hints_dto) = dto.chart_hints {
measurement.chart_hints = Some(convert_chart_hints(hints_dto));
}
Ok(measurement)
}
fn convert_signal_policy(dto: SignalPolicyDto) -> EtlResult<SignalPolicy> {
let windowing = match dto.windowing {
WindowStrategyDto::Instant => WindowStrategy::Instant,
WindowStrategyDto::Sliding {
duration,
min_samples,
} => WindowStrategy::Sliding {
duration,
min_samples,
},
};
Ok(SignalPolicy {
max_staleness: dto.max_staleness,
windowing,
time_format: dto.time_format,
})
}
fn convert_chart_hints(dto: ChartHintsDto) -> ChartHints {
let mut hints = ChartHints::new();
if let Some(label) = dto.label {
hints = hints.label(label);
}
if let Some(color) = dto.color {
hints = hints.color(color);
}
if let Some(true) = dto.stepped {
hints = hints.stepped();
}
if let Some(t) = dto.tension {
hints = hints.tension(t);
}
if let Some(axis_str) = dto.axis {
hints.axis = match axis_str.as_str() {
"y1" => AxisId::Y1,
"y2" => AxisId::Y2,
_ => AxisId::Y,
};
}
if let Some(chart_type_dto) = dto.chart_type {
hints.chart_type = convert_chart_type(chart_type_dto);
}
if let Some(index_dto) = dto.index {
hints.index = convert_index(index_dto);
}
if let Some(series_dto) = dto.series {
hints.series = convert_series(series_dto);
}
hints
}
fn convert_chart_type(dto: ChartTypeDto) -> ChartType {
match dto {
ChartTypeDto::Line => ChartType::Line,
ChartTypeDto::Bar => ChartType::Bar,
ChartTypeDto::Scatter => ChartType::Scatter,
ChartTypeDto::Bubble { size } => ChartType::Bubble { size },
}
}
fn convert_index(dto: IndexDto) -> Index {
match dto {
IndexDto::Time => Index::Time,
IndexDto::Subject => Index::Subject,
IndexDto::Quality(col) => Index::Quality(col),
IndexDto::Component(col) => Index::Component(col),
}
}
fn convert_series(dto: SeriesDto) -> ChartSeries {
match dto {
SeriesDto::Subject => ChartSeries::Subject,
SeriesDto::Quality(col) => ChartSeries::Quality(col),
SeriesDto::Component(col) => ChartSeries::Component(col),
SeriesDto::SubjectAndComponent(col) => ChartSeries::SubjectAndComponent(col),
}
}
fn convert_derivation(name: &str, dto: DerivationDto) -> Derivation {
let mut derivation =
Derivation::pointwise(name.to_owned(), dto.computation.pointwise).with_kind(dto.kind);
if let Some(hints_dto) = dto.chart_hints {
derivation.chart_hints = Some(convert_chart_hints(hints_dto));
}
derivation
}
#[cfg(test)]
mod tests {
use super::*;
use indexmap::IndexMap;
fn empty_qualities() -> IndexMap<String, QualityDto> {
IndexMap::new()
}
fn empty_derivations() -> IndexMap<String, DerivationDto> {
IndexMap::new()
}
#[test]
fn test_simple_schema_conversion() {
let mut measurements = IndexMap::new();
measurements.insert(
"water_level".to_string(),
MeasurementDto {
kind: crate::MeasurementKind::Measure,
components: vec![],
signal_policy: None,
sample_rate: None,
chart_hints: None,
},
);
let dto = SchemaDto {
name: "test".into(),
subject: "station".into(),
time: "timestamp".into(),
qualities: empty_qualities(),
measurements,
derivations: empty_derivations(),
};
let schema = dto.into_schema().unwrap();
assert_eq!(schema.name, "test");
assert_eq!(schema.subject.as_str(), "station");
assert_eq!(schema.time.as_str(), "timestamp");
assert_eq!(schema.measurements.len(), 1);
assert_eq!(schema.measurements[0].name, "water_level".into());
}
#[test]
fn test_measurement_with_components() {
let mut measurements = IndexMap::new();
measurements.insert(
"sales".to_string(),
MeasurementDto {
kind: crate::MeasurementKind::Count,
components: vec!["color".into(), "size".into()],
signal_policy: None,
sample_rate: None,
chart_hints: None,
},
);
let dto = SchemaDto {
name: "test".into(),
subject: "station".into(),
time: "timestamp".into(),
qualities: empty_qualities(),
measurements,
derivations: empty_derivations(),
};
let schema = dto.into_schema().unwrap();
let m = &schema.measurements[0];
assert_eq!(m.components.len(), 2);
assert_eq!(m.components[0].as_str(), "color");
assert_eq!(m.components[1].as_str(), "size");
}
#[test]
fn test_quality_conversion() {
let mut qualities = IndexMap::new();
qualities.insert("region".to_string(), QualityDto { chart_hints: None });
let dto = SchemaDto {
name: "test".into(),
subject: "station".into(),
time: "timestamp".into(),
qualities,
measurements: IndexMap::new(),
derivations: empty_derivations(),
};
let schema = dto.into_schema().unwrap();
assert_eq!(schema.qualities.len(), 1);
assert_eq!(schema.qualities[0].name, "region".into());
assert_eq!(schema.qualities[0].subject.as_str(), "station");
assert_eq!(schema.qualities[0].value.as_str(), "region");
}
#[test]
fn test_chart_hints_conversion() {
use crate::chart_hints::{ChartType, Index};
let hints_dto = ChartHintsDto {
label: Some("Test Label".into()),
color: Some("#ff0000".into()),
stepped: Some(true),
tension: Some(0.3),
axis: Some("y2".into()),
chart_type: Some(ChartTypeDto::Bar),
index: Some(IndexDto::Subject),
series: Some(SeriesDto::Component("engine_id".into())),
};
let hints = convert_chart_hints(hints_dto);
assert_eq!(hints.label, Some("Test Label".into()));
assert_eq!(hints.color, Some("#ff0000".into()));
assert!(hints.stepped);
assert_eq!(hints.tension, Some(0.3));
assert_eq!(hints.axis, AxisId::Y2);
assert_eq!(hints.chart_type, ChartType::Bar);
assert_eq!(hints.index, Index::Subject);
}
}