use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::column::DomainSignature;
use crate::unit_ref::EtlUnitRef;
use crate::universe::measurement_storage::DataSourceName;
use super::bindings::{CodomainBinding, ColumnBinding};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct SourceKey(usize);
impl SourceKey {
pub fn from_arc<T>(arc: &Arc<T>) -> Self {
Self(Arc::as_ptr(arc) as *const () as usize)
}
pub fn from_raw(value: usize) -> Self {
Self(value)
}
pub fn as_raw(&self) -> usize {
self.0
}
}
impl std::fmt::Display for SourceKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "source#{:x}", self.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceMember {
pub unit: EtlUnitRef,
pub value: CodomainBinding,
}
impl SourceMember {
pub fn new(unit: EtlUnitRef, value: CodomainBinding) -> Self {
Self { unit, value }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceContext {
pub source_name: DataSourceName,
pub source_key: SourceKey,
pub subject: ColumnBinding,
pub time: ColumnBinding,
pub components: Vec<ColumnBinding>,
pub members: Vec<SourceMember>,
}
impl SourceContext {
pub fn has_components(&self) -> bool {
!self.components.is_empty()
}
pub fn domain(&self) -> DomainSignature {
DomainSignature::measurement(
self.subject.canonical.as_str(),
self.time.canonical.as_str(),
)
.with_components(
self.components
.iter()
.map(|c| c.canonical.as_str().to_string())
.collect(),
)
}
pub fn measurement_members(&self) -> impl Iterator<Item = &SourceMember> {
self.members.iter().filter(|m| m.unit.is_measurement())
}
pub fn quality_members(&self) -> impl Iterator<Item = &SourceMember> {
self.members.iter().filter(|m| m.unit.is_quality())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::unit::NullValue;
fn make_context() -> SourceContext {
SourceContext {
source_name: DataSourceName::new("scada"),
source_key: SourceKey::from_raw(0xDEAD_BEEF),
subject: ColumnBinding::new("station_id", "station_name"),
time: ColumnBinding::new("obs_time", "timestamp"),
components: vec![],
members: vec![
SourceMember::new(
EtlUnitRef::measurement("sump"),
CodomainBinding::new("sump_reading", "sump"),
),
SourceMember::new(
EtlUnitRef::quality("station_label"),
CodomainBinding::new("display_name", "station_label"),
),
],
}
}
#[test]
fn source_key_from_arc_is_stable_across_clones() {
let arc = Arc::new(42u32);
let k1 = SourceKey::from_arc(&arc);
let k2 = SourceKey::from_arc(&arc.clone());
assert_eq!(k1, k2);
}
#[test]
fn source_key_distinguishes_distinct_arcs() {
let a = Arc::new(1u32);
let b = Arc::new(1u32);
assert_ne!(SourceKey::from_arc(&a), SourceKey::from_arc(&b));
}
#[test]
fn source_key_display_is_hex() {
let k = SourceKey::from_raw(0xDEAD_BEEF);
assert_eq!(format!("{}", k), "source#deadbeef");
}
#[test]
fn has_components_false_when_empty() {
let ctx = make_context();
assert!(!ctx.has_components());
}
#[test]
fn has_components_true_when_present() {
let mut ctx = make_context();
ctx.components
.push(ColumnBinding::identity("engine_number"));
assert!(ctx.has_components());
}
#[test]
fn domain_signature_reflects_canonical_names() {
let ctx = make_context();
let sig = ctx.domain();
assert_eq!(sig.subject.as_str(), "station_name");
assert_eq!(sig.time.as_ref().unwrap().as_str(), "timestamp");
assert!(sig.components.is_empty());
}
#[test]
fn domain_signature_includes_components() {
let mut ctx = make_context();
ctx.components
.push(ColumnBinding::identity("engine_number"));
let sig = ctx.domain();
assert_eq!(sig.components.len(), 1);
assert_eq!(sig.components[0].as_str(), "engine_number");
}
#[test]
fn member_iterators_partition_by_kind() {
let ctx = make_context();
assert_eq!(ctx.measurement_members().count(), 1);
assert_eq!(ctx.quality_members().count(), 1);
}
#[test]
fn null_fills_attach_only_to_codomain() {
let m = SourceMember::new(
EtlUnitRef::measurement("engines_on_count"),
CodomainBinding::new("engine_on", "engines_on_count")
.with_source_null_fill(NullValue::Integer(0))
.with_join_null_fill(NullValue::Integer(0)),
);
assert!(m.value.source_null_fill.is_some());
assert!(m.value.join_null_fill.is_some());
}
#[test]
fn serde_roundtrip_source_context() {
let ctx = make_context();
let json = serde_json::to_string(&ctx).unwrap();
let back: SourceContext = serde_json::from_str(&json).unwrap();
assert_eq!(back.source_name.as_str(), "scada");
assert_eq!(back.subject.canonical.as_str(), "station_name");
assert_eq!(back.members.len(), 2);
}
}