use serde::Serialize;
use crate::ResampleStrategy;
use crate::column::CanonicalColumnName;
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AlignAction {
SignalOnly,
Upsample { strategy: ResampleStrategy },
Downsample { strategy: ResampleStrategy },
PassThrough,
}
#[derive(Debug, Clone, Serialize)]
pub struct MeasurementAlignment {
pub name: CanonicalColumnName,
pub native_rate_ms: Option<i64>,
pub ttl_ms: Option<u64>,
pub upsample_strategy: Option<ResampleStrategy>,
pub downsample_strategy: Option<ResampleStrategy>,
pub action: AlignAction,
pub effective_rate_ms: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct AlignmentSpec {
pub unified_rate_ms: i64,
pub measurements: Vec<MeasurementAlignment>,
}
impl AlignmentSpec {
pub fn compute(
measurements: &std::collections::HashMap<
CanonicalColumnName,
super::universe_of_etlunits::MeasurementData,
>,
requested: &[CanonicalColumnName],
) -> Option<Self> {
let mut infos: Vec<(
CanonicalColumnName,
Option<i64>,
Option<u64>,
Option<ResampleStrategy>,
Option<ResampleStrategy>,
)> = Vec::new();
let mut declared_rates: Vec<i64> = Vec::new();
for name in requested {
if let Some(md) = measurements.get(name) {
let native = md.unit.sample_rate_ms;
let ttl = md
.unit
.signal_policy
.as_ref()
.map(|p| p.ttl().as_millis() as u64);
let up = md.unit.upsample_strategy;
let down = md.unit.downsample_strategy;
if let Some(rate) = native {
declared_rates.push(rate);
}
infos.push((name.clone(), native, ttl, up, down));
}
}
if declared_rates.is_empty() {
return None;
}
let fastest = *declared_rates.iter().min().unwrap();
let slowest = *declared_rates.iter().max().unwrap();
let unified_rate_ms = if fastest == slowest {
fastest
} else {
let all_slow_have_upsample = infos
.iter()
.filter(|(_, native, _, _, _)| native.unwrap_or(0) > fastest)
.all(|(_, _, _, up, _)| up.is_some());
if all_slow_have_upsample {
fastest
} else {
slowest
}
};
let mut alignments = Vec::new();
for (name, native, ttl, up, down) in infos {
let native_ms = native.unwrap_or(unified_rate_ms);
let action = if native.is_none() {
if measurements
.get(&name)
.map(|md| md.unit.signal_policy.is_some())
.unwrap_or(false)
{
AlignAction::SignalOnly
} else {
AlignAction::PassThrough
}
} else if unified_rate_ms < native_ms {
match up {
Some(strategy) => AlignAction::Upsample { strategy },
None => AlignAction::SignalOnly, }
} else if unified_rate_ms > native_ms {
match down {
Some(strategy) => AlignAction::Downsample { strategy },
None => AlignAction::SignalOnly, }
} else {
AlignAction::SignalOnly
};
let effective_rate_ms = match &action {
AlignAction::Upsample { .. } | AlignAction::Downsample { .. } => unified_rate_ms,
_ => native_ms,
};
alignments.push(MeasurementAlignment {
name,
native_rate_ms: native,
ttl_ms: ttl,
upsample_strategy: up,
downsample_strategy: down,
action,
effective_rate_ms,
});
}
Some(AlignmentSpec {
unified_rate_ms,
measurements: alignments,
})
}
pub fn action_for(&self, name: &CanonicalColumnName) -> Option<&AlignAction> {
self.measurements
.iter()
.find(|m| &m.name == name)
.map(|m| &m.action)
}
pub fn info_for(&self, name: &CanonicalColumnName) -> Option<&MeasurementAlignment> {
self.measurements.iter().find(|m| &m.name == name)
}
}