use serde::{Deserialize, Serialize};
use super::null_value::NullValue;
use crate::{
aggregation::Aggregate,
chart_hints::ChartHints,
column::{CanonicalColumnName, DomainSignature},
signal_policy::SignalPolicy,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MeasurementKind {
Count,
Measure,
Average,
Categorical,
Binary,
}
impl MeasurementKind {
pub fn default_aggregation(&self) -> Aggregate {
match self {
MeasurementKind::Measure => Aggregate::Mean,
MeasurementKind::Count => Aggregate::Sum,
MeasurementKind::Categorical => Aggregate::Last,
MeasurementKind::Average => Aggregate::Mean,
MeasurementKind::Binary => Aggregate::Any,
}
}
pub fn default_null_value(&self) -> NullValue {
match self {
MeasurementKind::Count => NullValue::Integer(0),
MeasurementKind::Measure => NullValue::Float(0.0),
MeasurementKind::Average => NullValue::Float(0.0),
MeasurementKind::Categorical => NullValue::String(String::new()),
MeasurementKind::Binary => NullValue::Boolean(false),
}
}
pub fn is_compatible_null_value(&self, value: &NullValue) -> bool {
match (self, value) {
(MeasurementKind::Count, NullValue::Integer(_)) => true,
(MeasurementKind::Count, NullValue::Float(_)) => true,
(MeasurementKind::Measure, NullValue::Float(_)) => true,
(MeasurementKind::Measure, NullValue::Integer(_)) => true,
(MeasurementKind::Average, NullValue::Float(_)) => true,
(MeasurementKind::Average, NullValue::Integer(_)) => true,
(MeasurementKind::Categorical, NullValue::String(_)) => true,
(MeasurementKind::Categorical, NullValue::Integer(_)) => true,
(MeasurementKind::Binary, NullValue::Boolean(_)) => true,
(MeasurementKind::Binary, NullValue::Integer(_)) => true,
_ => false,
}
}
pub fn expected_null_value_types(&self) -> &'static str {
match self {
MeasurementKind::Count => "NullValue::Integer or NullValue::Float",
MeasurementKind::Measure => "NullValue::Float or NullValue::Integer",
MeasurementKind::Average => "NullValue::Float or NullValue::Integer",
MeasurementKind::Categorical => "NullValue::String or NullValue::Integer",
MeasurementKind::Binary => "NullValue::Boolean or NullValue::Integer",
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DataTemporality {
#[default]
Historical,
Forecast,
}
impl DataTemporality {
pub fn is_historical(&self) -> bool {
matches!(self, DataTemporality::Historical)
}
pub fn is_forecast(&self) -> bool {
matches!(self, DataTemporality::Forecast)
}
}
impl std::fmt::Display for DataTemporality {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DataTemporality::Historical => write!(f, "historical"),
DataTemporality::Forecast => write!(f, "forecast"),
}
}
}
#[derive(PartialEq, Eq, Debug, Clone, Default, Serialize, Deserialize)]
pub struct TruthMapping {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub true_values: Vec<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub false_values: Option<Vec<serde_json::Value>>,
}
impl TruthMapping {
pub fn new() -> Self {
Self::default()
}
pub fn numeric() -> Self {
Self {
true_values: vec![serde_json::Value::from(1), serde_json::Value::from(1.0)],
false_values: Some(vec![
serde_json::Value::from(0),
serde_json::Value::from(0.0),
]),
}
}
pub fn boolean() -> Self {
Self {
true_values: vec![serde_json::Value::Bool(true)],
false_values: Some(vec![serde_json::Value::Bool(false)]),
}
}
pub fn with_true_values<I, V>(mut self, values: I) -> Self
where
I: IntoIterator<Item = V>,
V: Into<serde_json::Value>,
{
self.true_values = values.into_iter().map(|v| v.into()).collect();
self
}
pub fn with_false_values<I, V>(mut self, values: I) -> Self
where
I: IntoIterator<Item = V>,
V: Into<serde_json::Value>,
{
self.false_values = Some(values.into_iter().map(|v| v.into()).collect());
self
}
pub fn true_strings<I, S>(mut self, values: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.true_values.extend(
values
.into_iter()
.map(|s| serde_json::Value::String(s.into())),
);
self
}
pub fn false_strings<I, S>(mut self, values: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let false_vals = self.false_values.get_or_insert_with(Vec::new);
false_vals.extend(
values
.into_iter()
.map(|s| serde_json::Value::String(s.into())),
);
self
}
pub fn is_true(&self, value: &serde_json::Value) -> bool {
self.true_values.contains(value)
}
pub fn is_false(&self, value: &serde_json::Value) -> bool {
match &self.false_values {
Some(false_vals) => false_vals.contains(value),
None => !self.is_true(value),
}
}
pub fn has_true_values(&self) -> bool {
!self.true_values.is_empty()
}
pub fn has_false_values(&self) -> bool {
self.false_values
.as_ref()
.map(|v| !v.is_empty())
.unwrap_or(false)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeasurementUnit {
pub name: CanonicalColumnName,
pub subject: CanonicalColumnName,
pub time: CanonicalColumnName,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub components: Vec<CanonicalColumnName>,
pub value: CanonicalColumnName,
pub kind: MeasurementKind,
pub null_value: Option<NullValue>,
pub null_value_extension: Option<NullValue>,
pub signal_policy: Option<SignalPolicy>,
pub chart_hints: Option<ChartHints>,
pub signal_aggregation: Option<Aggregate>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub truth_mapping: Option<TruthMapping>,
#[serde(default)]
pub temporality: DataTemporality,
#[serde(default)]
pub sample_rate_ms: Option<i64>,
#[serde(default)]
pub upsample_strategy: Option<ResampleStrategy>,
#[serde(default)]
pub downsample_strategy: Option<ResampleStrategy>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ResampleStrategy {
ForwardFill,
Interpolate,
Null,
Mean,
Max,
Min,
Sum,
Last,
}
impl PartialEq for MeasurementUnit {
fn eq(&self, other: &Self) -> bool {
self.subject == other.subject
&& self.time == other.time
&& self.components == other.components
&& self.value == other.value
&& self.signal_policy == other.signal_policy
&& self.truth_mapping == other.truth_mapping
}
}
impl Eq for MeasurementUnit {}
impl MeasurementUnit {
pub fn new(
subject: impl Into<CanonicalColumnName>,
time: impl Into<CanonicalColumnName>,
value: impl Into<CanonicalColumnName>,
kind: MeasurementKind,
) -> Self {
let value: CanonicalColumnName = value.into();
Self {
name: value.clone(),
subject: subject.into(),
time: time.into(),
components: Vec::new(),
value,
kind,
null_value: None,
null_value_extension: None,
chart_hints: None,
signal_policy: None,
signal_aggregation: None,
truth_mapping: None,
temporality: DataTemporality::default(),
sample_rate_ms: None,
upsample_strategy: None,
downsample_strategy: None,
}
}
pub fn historical(mut self) -> Self {
self.temporality = DataTemporality::Historical;
self
}
pub fn forecast(mut self) -> Self {
self.temporality = DataTemporality::Forecast;
self
}
pub fn is_forecast(&self) -> bool {
self.temporality.is_forecast()
}
pub fn is_historical(&self) -> bool {
self.temporality.is_historical()
}
pub fn with_components(mut self, components: Vec<impl Into<String>>) -> Self {
self.components = components
.into_iter()
.map(|c| CanonicalColumnName::new(c))
.collect();
self
}
pub fn with_component(mut self, component: impl Into<String>) -> Self {
self.components.push(CanonicalColumnName::new(component));
self
}
pub fn with_null_value(mut self, value: NullValue) -> Self {
self.null_value = Some(value);
self
}
pub fn with_null_extension(mut self, value: NullValue) -> Self {
self.null_value_extension = Some(value);
self
}
pub fn with_signal_policy(mut self, policy: SignalPolicy) -> Self {
self.signal_policy = Some(policy);
self
}
pub fn with_chart_hints(mut self, hints: ChartHints) -> Self {
self.chart_hints = Some(hints);
self
}
pub fn with_truth_mapping(mut self, mapping: TruthMapping) -> Self {
self.truth_mapping = Some(mapping);
self
}
pub fn domain_signature(&self) -> DomainSignature {
DomainSignature::measurement(self.subject.as_str(), self.time.as_str()).with_components(
self.components
.iter()
.map(|c| c.as_str().to_string())
.collect(),
)
}
pub fn signal_aggregation(&self) -> Aggregate {
self.signal_aggregation
.unwrap_or_else(|| self.kind.default_aggregation())
}
pub fn with_signal_aggregation(mut self, agg: Aggregate) -> Self {
self.signal_aggregation = Some(agg);
self
}
pub fn with_sample_rate_ms(mut self, rate_ms: i64) -> Self {
self.sample_rate_ms = Some(rate_ms);
self
}
pub fn with_upsample(mut self, strategy: ResampleStrategy) -> Self {
self.upsample_strategy = Some(strategy);
self
}
pub fn with_downsample(mut self, strategy: ResampleStrategy) -> Self {
self.downsample_strategy = Some(strategy);
self
}
pub fn effective_chart_hints(&self) -> ChartHints {
self.chart_hints.clone().unwrap_or_else(|| match self.kind {
MeasurementKind::Categorical | MeasurementKind::Binary => ChartHints::categorical(),
_ => ChartHints::measure(),
})
}
pub fn is_binary(&self) -> bool {
self.kind == MeasurementKind::Binary
}
pub fn effective_truth_mapping(&self) -> Option<TruthMapping> {
if self.kind == MeasurementKind::Binary {
Some(
self.truth_mapping
.clone()
.unwrap_or_else(TruthMapping::numeric),
)
} else {
None
}
}
pub fn signal_policy(&self) -> Option<&SignalPolicy> {
self.signal_policy.as_ref()
}
pub fn etl_unit_signature(&self) -> Vec<CanonicalColumnName> {
let mut signature = vec![self.subject.clone(), self.time.clone()];
if !self.components.is_empty() {
signature.extend(self.components.iter().cloned());
}
signature.push(self.value.clone());
signature
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_measurement() {
let m = MeasurementUnit::new(
"station_id",
"observation_time",
"sump_ft",
MeasurementKind::Measure,
);
assert_eq!(m.name, "sump_ft".into());
assert_eq!(m.subject.as_str(), "station_id");
assert_eq!(m.time.as_str(), "observation_time");
assert_eq!(m.value.as_str(), "sump_ft");
assert!(m.components.is_empty());
assert_eq!(m.kind.default_aggregation(), Aggregate::Mean);
}
#[test]
fn test_measurement_with_components() {
let m = MeasurementUnit::new(
"store_id",
"sale_date",
"units_sold",
MeasurementKind::Count,
)
.with_components(vec!["color", "size"]);
assert_eq!(m.components.len(), 2);
assert_eq!(m.kind.default_aggregation(), Aggregate::Sum);
let sig = m.domain_signature();
assert_eq!(sig.components.len(), 2);
assert_eq!(sig.components[0].as_str(), "color");
assert_eq!(sig.components[1].as_str(), "size");
}
#[test]
fn test_domain_signature() {
let m = MeasurementUnit::new(
"sensor_id",
"reading_time",
"temp_c",
MeasurementKind::Measure,
);
let sig = m.domain_signature();
assert_eq!(sig.subject.as_str(), "sensor_id");
assert_eq!(sig.time.as_ref().map(|t| t.as_str()), Some("reading_time"));
assert!(sig.components.is_empty());
}
#[test]
fn test_categorical_chart_hints() {
let m = MeasurementUnit::new(
"station_id",
"observation_time",
"engine_1",
MeasurementKind::Categorical,
);
let hints = m.effective_chart_hints();
assert!(hints.stepped);
}
#[test]
fn test_binary_measurement() {
let m = MeasurementUnit::new(
"station_id",
"observation_time",
"engine_status",
MeasurementKind::Binary,
);
assert!(m.is_binary());
assert_eq!(m.kind.default_aggregation(), Aggregate::Any);
let mapping = m.effective_truth_mapping().unwrap();
assert!(mapping.has_true_values());
}
#[test]
fn test_binary_with_string_truth_mapping() {
let m = MeasurementUnit::new(
"station_id",
"observation_time",
"engine_status",
MeasurementKind::Binary,
)
.with_truth_mapping(
TruthMapping::new()
.true_strings(["on", "running", "active"])
.false_strings(["off", "stopped", "inactive"]),
);
let mapping = m.truth_mapping.as_ref().unwrap();
assert!(mapping.is_true(&serde_json::Value::String("on".into())));
assert!(mapping.is_true(&serde_json::Value::String("running".into())));
assert!(mapping.is_false(&serde_json::Value::String("off".into())));
assert!(!mapping.is_true(&serde_json::Value::String("unknown".into())));
}
#[test]
fn test_truth_mapping_numeric() {
let mapping = TruthMapping::numeric();
assert!(mapping.is_true(&serde_json::Value::from(1)));
assert!(mapping.is_true(&serde_json::Value::from(1.0)));
assert!(mapping.is_false(&serde_json::Value::from(0)));
assert!(mapping.is_false(&serde_json::Value::from(0.0)));
}
#[test]
fn test_truth_mapping_implicit_false() {
let mapping = TruthMapping::new().true_strings(["yes"]);
assert!(mapping.is_true(&serde_json::Value::String("yes".into())));
assert!(mapping.is_false(&serde_json::Value::String("no".into())));
assert!(mapping.is_false(&serde_json::Value::String("maybe".into())));
}
}