mod assembly;
mod derivation;
mod extraction;
mod fragment;
mod info;
pub mod measurement_storage;
mod universe_of_etlunits;
use std::{collections::HashMap, time::Instant};
pub use extraction::extract_all_fragments;
pub use fragment::{
ComposedMeasurement, ComposedQuality, CrushedComponent, EtlUnitFragment, FragmentAccumulator,
MeasurementFragment, QualityFragment, stack_all_fragments,
};
pub use info::{
CompositionStrategyKind, CompositionSummary, CrushedComponentInfo, UniverseBuildInfo,
UniverseBuildInfoBuilder,
};
pub mod alignment;
pub use alignment::{AlignAction, AlignmentSpec, MeasurementAlignment};
pub use measurement_storage::FragmentRef;
use tracing::{debug, info, warn};
pub use universe_of_etlunits::{
MeasurementData, MeasurementDiag, MeasurementPolicyDiag, MeasurementState, MemorySummary,
QualityData, Universe,
};
use crate::{
CanonicalColumnName,
error::{EtlError, EtlResult},
source::EtlUniverseBuildPlan,
};
pub struct UniverseBuilder;
impl UniverseBuilder {
#[tracing::instrument(skip(plan), fields(schema = %plan.schema.name, sources = plan.sources.len()))]
pub fn build(plan: &EtlUniverseBuildPlan) -> EtlResult<Universe> {
let start = Instant::now();
let schema = &plan.schema;
info!("🟢 Starting universe build");
info!("👉 Phase 1: Extracting fragments");
let accumulator = extraction::extract_all_fragments(plan)?;
debug!(
measurements = accumulator.measurement_count(),
qualities = accumulator.quality_count(),
total = accumulator.total_fragments(),
"✅ Extraction complete"
);
if accumulator.is_empty() {
return Err(EtlError::Config("No fragments extracted".into()));
}
info!("👉 Phase 2: Stacking fragments");
let (composed_measurements, composed_qualities) =
stack_all_fragments(accumulator, &schema.subject, &schema.time)?;
let crushed: Vec<CrushedComponent> = composed_measurements
.iter()
.flat_map(|m| m.crushed_components.clone())
.collect();
if !crushed.is_empty() {
warn!(count = crushed.len(), "Components crushed during stacking");
}
debug!(
measurements = composed_measurements.len(),
qualities = composed_qualities.len(),
"✅ Stacking complete"
);
info!("👉 Phase 3: Building Universe storage");
let mut measurements: HashMap<CanonicalColumnName, MeasurementData> = HashMap::new();
let mut qualities: HashMap<CanonicalColumnName, QualityData> = HashMap::new();
for composed in composed_measurements {
let name = composed.name.clone();
let unit = schema.get_measurement(&name).cloned().unwrap_or_else(|| {
crate::MeasurementUnit::new(
schema.subject.clone(),
schema.time.clone(),
name.clone(),
composed.kind,
)
.with_components(
composed
.components
.iter()
.map(|c| c.as_str().to_string())
.collect(),
)
});
let stats = if composed.signal_policy_stats.len() == 1 {
composed.signal_policy_stats.into_iter().next()
} else if !composed.signal_policy_stats.is_empty() {
composed.signal_policy_stats.into_iter().next()
} else {
None
};
let _ = stats; let measurement_data = MeasurementData {
unit,
state: MeasurementState::Raw {
fragment: composed.fragment,
},
};
measurements.insert(name, measurement_data);
}
for composed in composed_qualities {
let name = composed.name.clone();
let unit = schema.get_quality(&name).cloned().unwrap_or_else(|| {
crate::QualityUnit::new(schema.subject.clone(), name.clone())
});
let quality_data = QualityData::new(unit, composed.data);
qualities.insert(name, quality_data);
}
for qd in &schema.quality_derivations {
let domain_qd = qualities.get(&qd.domain).ok_or_else(|| {
EtlError::Config(format!(
"QualityDerivation '{}': domain quality '{}' not found \
in universe. Declared base qualities must precede derived \
qualities that reference them.",
qd.name.as_str(),
qd.domain.as_str(),
))
})?;
let derived_df =
crate::unit::compute_derived_quality(&domain_qd.data, qd, schema.subject.as_str())?;
let derived_unit = qd.to_quality_unit(&schema.subject);
qualities.insert(qd.name.clone(), QualityData::new(derived_unit, derived_df));
}
debug!(
measurements = measurements.len(),
qualities = qualities.len(),
quality_derivations = schema.quality_derivations.len(),
"✅ Storage populated"
);
info!("👉 Phase 4: Assembling universe");
let universe = assembly::assemble_universe(
measurements,
qualities,
schema.clone(),
plan,
crushed,
start.elapsed(),
)?;
info!(
measurements = universe.measurement_count(),
qualities = universe.quality_count(),
duration_ms = universe.build_info().build_duration.as_millis(),
"✅ Universe build complete"
);
Ok(universe)
}
}