use std::collections::HashMap;
use std::sync::Arc;
use crate::column::{CanonicalColumnName, SourceColumnName};
use crate::error::{EtlError, EtlResult};
use crate::request::{EtlUnitSubsetRequest, SubjectFilter};
use crate::subject::SubjectValue;
use crate::unit_ref::EtlUnitRef;
use crate::universe::measurement_storage::DataSourceName;
use crate::universe::{MeasurementData, Universe};
use super::bindings::{CodomainBinding, ColumnBinding};
use super::core::ExtractionCore;
use super::crush::{Crush, CrushMember, CrushPlan};
use super::filter::FilterPlan;
use super::join::{GroupSignalConfig, JoinColumn, JoinKeys, JoinPlan, SourceJoin};
use super::runtime::{DerivationPlan, RuntimePlan};
use super::source_context::{SourceContext, SourceKey, SourceMember};
pub fn build_runtime_plan(
universe: &Universe,
request: &EtlUnitSubsetRequest,
) -> EtlResult<RuntimePlan> {
let schema = universe.schema();
let measurement_names: Vec<CanonicalColumnName> = if request.measurements.is_empty() {
(&universe.measurements).keys().cloned().collect()
} else {
request.measurements.clone()
};
let mut primary_names: Vec<CanonicalColumnName> = Vec::new();
let mut derivation_refs: Vec<EtlUnitRef> = Vec::new();
for name in &measurement_names {
if schema.get_derivation(name).is_some() {
derivation_refs.push(EtlUnitRef::derivation(name.clone()));
} else {
primary_names.push(name.clone());
}
}
let mut by_source: HashMap<usize, Vec<&MeasurementData>> = HashMap::new();
for name in &primary_names {
let md = (&universe.measurements)
.get(name)
.ok_or_else(|| EtlError::UnitNotFound(format!("Measurement '{}' not found", name)))?;
let key = md
.fragment()
.source_arc_ptrs()
.first()
.copied()
.unwrap_or(0);
by_source.entry(key).or_default().push(md);
}
let mut source_keys: Vec<usize> = by_source.keys().copied().collect();
source_keys.sort_unstable();
let mut sources: Vec<Arc<SourceContext>> = Vec::with_capacity(source_keys.len());
for raw_key in source_keys {
let mds = by_source
.get(&raw_key)
.expect("source key was just iterated from the map");
let context = build_source_context(SourceKey::from_raw(raw_key), mds, schema)?;
sources.push(Arc::new(context));
}
let time_range = request.time_range.clone();
let subject_set = subject_filter_to_values(request.subject_filter.as_ref());
let filter = FilterPlan::build(sources.iter().cloned(), time_range, subject_set);
let crush = build_crush_plan(&sources, universe);
let join = build_join_plan(&sources, universe);
let derivations = DerivationPlan::from_derivations(derivation_refs);
let core = ExtractionCore::new(filter, crush);
Ok(RuntimePlan::new(core, join, derivations))
}
fn build_source_context(
source_key: SourceKey,
mds: &[&MeasurementData],
schema: &crate::EtlSchema,
) -> EtlResult<SourceContext> {
let representative = mds
.first()
.ok_or_else(|| EtlError::Config("build_source_context called with empty group".into()))?;
let frag = representative.fragment();
let source_name = frag
.source_name()
.cloned()
.unwrap_or_else(|| DataSourceName::new(format!("source_{:x}", source_key.as_raw())));
let subject_phys = frag
.physical_column_name(schema.subject.as_str())
.unwrap_or_else(|| schema.subject.as_str().to_string());
let time_phys = frag
.physical_column_name(schema.time.as_str())
.unwrap_or_else(|| schema.time.as_str().to_string());
let subject = ColumnBinding::new(SourceColumnName::new(subject_phys), schema.subject.clone());
let time = ColumnBinding::new(SourceColumnName::new(time_phys), schema.time.clone());
let components: Vec<ColumnBinding> = representative
.unit
.components
.iter()
.map(|c| {
let phys = frag
.physical_column_name(c.as_str())
.unwrap_or_else(|| c.as_str().to_string());
ColumnBinding::new(SourceColumnName::new(phys), c.clone())
})
.collect();
let members: Vec<SourceMember> = mds.iter().map(|md| build_source_member(md)).collect();
Ok(SourceContext {
source_name,
source_key,
subject,
time,
components,
members,
})
}
fn build_source_member(md: &MeasurementData) -> SourceMember {
let frag = md.fragment();
let value_phys = frag
.physical_column_name(md.unit.name.as_str())
.unwrap_or_else(|| md.unit.name.as_str().to_string());
let mut binding = CodomainBinding::new(SourceColumnName::new(value_phys), md.unit.name.clone());
if let Some(ref fill) = md.unit.null_value {
binding = binding.with_source_null_fill(fill.clone());
}
if let Some(ref fill) = md.unit.null_value_extension {
binding = binding.with_join_null_fill(fill.clone());
}
SourceMember::new(EtlUnitRef::measurement(md.unit.name.clone()), binding)
}
fn build_crush_plan(sources: &[Arc<SourceContext>], universe: &Universe) -> CrushPlan {
let crushes: Vec<Crush> = sources
.iter()
.filter(|src| src.has_components())
.map(|src| {
let members: Vec<CrushMember> = src
.members
.iter()
.filter_map(|m| {
let canonical = m.unit.name();
let mu = (&universe.measurements).get(canonical)?;
Some(CrushMember::new(
m.unit.clone(),
m.value.clone(),
mu.unit.signal_aggregation(),
))
})
.collect();
Crush::components(src.clone(), members)
})
.collect();
CrushPlan { crushes }
}
fn build_join_plan(sources: &[Arc<SourceContext>], universe: &Universe) -> JoinPlan {
use std::collections::HashMap;
let mut joins: Vec<SourceJoin> = Vec::new();
for src in sources {
let mut by_config: HashMap<GroupSignalConfig, Vec<JoinColumn>> = HashMap::new();
let mut config_order: Vec<GroupSignalConfig> = Vec::new();
for member in &src.members {
if !member.unit.is_measurement() {
continue;
}
let mu = match (&universe.measurements).get(member.unit.name()) {
Some(md) => &md.unit,
None => continue,
};
let config = GroupSignalConfig::new(
mu.signal_policy
.as_ref()
.map(|p| p.ttl().as_millis() as i64),
mu.sample_rate_ms,
);
if !by_config.contains_key(&config) {
config_order.push(config.clone());
}
by_config
.entry(config)
.or_default()
.push(JoinColumn::new(member.unit.clone(), member.value.clone()));
}
for config in config_order {
let columns = by_config.remove(&config).expect("config was just inserted");
joins.push(SourceJoin::new(
src.clone(),
JoinKeys::SubjectTime,
config,
columns,
));
}
}
JoinPlan { joins }
}
fn subject_filter_to_values(filter: Option<&SubjectFilter>) -> Option<Vec<SubjectValue>> {
match filter? {
SubjectFilter::Include(values) => {
let strings: Vec<SubjectValue> = values
.iter()
.filter_map(|v| v.as_str().map(|s| SubjectValue::new(s.to_string())))
.collect();
if strings.is_empty() {
None
} else {
Some(strings)
}
}
_ => None,
}
}
#[cfg(test)]
mod tests {
}