use std::collections::{BTreeMap, BTreeSet};
use chrono::Utc;
use faultline_core::{ChangeEvent, ChangeKind, DatasetSnapshot, FeatureRef};
use uuid::Uuid;
use crate::error::FaultlineDiffError;
use crate::model::{DiffArtifact, DiffClassification, DiffSummary, SnapshotState};
pub trait DiffEngine {
fn compare_snapshots(
&self,
left: &SnapshotState,
right: &SnapshotState,
) -> Result<DiffArtifact, FaultlineDiffError>;
}
#[derive(Debug, Default)]
pub struct BasicDiffEngine;
impl BasicDiffEngine {
pub fn new() -> Self {
Self
}
fn stable_key(reference: &FeatureRef) -> String {
reference
.business_key
.as_ref()
.cloned()
.unwrap_or_else(|| reference.feature_id.to_string())
}
fn to_feature_map(features: &[FeatureRef]) -> BTreeMap<String, FeatureRef> {
features
.iter()
.cloned()
.map(|feature| (Self::stable_key(&feature), feature))
.collect()
}
fn validate_dataset_ids(
left_snapshot: &DatasetSnapshot,
right_snapshot: &DatasetSnapshot,
) -> Result<(), FaultlineDiffError> {
if left_snapshot.dataset_id != right_snapshot.dataset_id {
return Err(FaultlineDiffError::DatasetMismatch {
left: left_snapshot.dataset_id.clone(),
right: right_snapshot.dataset_id.clone(),
});
}
Ok(())
}
}
impl DiffEngine for BasicDiffEngine {
fn compare_snapshots(
&self,
left: &SnapshotState,
right: &SnapshotState,
) -> Result<DiffArtifact, FaultlineDiffError> {
Self::validate_dataset_ids(&left.snapshot, &right.snapshot)?;
let left_map = Self::to_feature_map(&left.features);
let right_map = Self::to_feature_map(&right.features);
let keys: BTreeSet<String> = left_map.keys().chain(right_map.keys()).cloned().collect();
let mut change_events = Vec::new();
for key in keys {
let left_ref = left_map.get(&key);
let right_ref = right_map.get(&key);
match (left_ref, right_ref) {
(None, Some(added)) => {
let mut event = ChangeEvent::new(ChangeKind::Added, "feature inserted");
event.right_refs.push(added.clone());
event.confidence = 1.0;
change_events.push(event);
}
(Some(removed), None) => {
let mut event = ChangeEvent::new(ChangeKind::Removed, "feature removed");
event.left_refs.push(removed.clone());
event.confidence = 1.0;
change_events.push(event);
}
(Some(left_feature), Some(right_feature)) => {
if left_feature.lineage_hint != right_feature.lineage_hint {
let mut event = ChangeEvent::new(
ChangeKind::AttributeChanged,
"feature lineage changed",
);
event.left_refs.push(left_feature.clone());
event.right_refs.push(right_feature.clone());
event.confidence = 0.8;
change_events.push(event);
}
}
(None, None) => {
return Err(FaultlineDiffError::InvalidInput(
"diff key present with no left or right feature",
));
}
}
}
let mut summary = DiffSummary::empty();
summary.diff_id = Uuid::new_v4();
for event in &change_events {
match event.change_kind {
ChangeKind::Added => summary.added += 1,
ChangeKind::Removed => summary.removed += 1,
_ => summary.changed += 1,
}
}
let classification = if change_events.is_empty() {
DiffClassification::NoChanges
} else {
DiffClassification::Deterministic
};
Ok(DiffArtifact {
artifact_id: Uuid::new_v4(),
generated_at: Utc::now(),
classification,
left_snapshot_id: left.snapshot.snapshot_id,
right_snapshot_id: right.snapshot.snapshot_id,
summary,
change_events,
})
}
}
#[cfg(test)]
mod tests {
use faultline_core::{DatasetSnapshot, FeatureRef};
use super::{BasicDiffEngine, DiffEngine};
use crate::model::SnapshotState;
fn feature(dataset_id: &str, business_key: &str, lineage_hint: &str) -> FeatureRef {
let mut feature = FeatureRef::new(dataset_id.to_string());
feature.business_key = Some(business_key.to_string());
feature.lineage_hint = Some(lineage_hint.to_string());
feature
}
#[test]
fn compare_snapshots_detects_add_remove_and_change() {
let left = SnapshotState {
snapshot: DatasetSnapshot::new("parcels"),
features: vec![
feature("parcels", "A", "v1"),
feature("parcels", "B", "v1"),
feature("parcels", "C", "v1"),
],
};
let right = SnapshotState {
snapshot: DatasetSnapshot::new("parcels"),
features: vec![
feature("parcels", "A", "v2"),
feature("parcels", "C", "v1"),
feature("parcels", "D", "v1"),
],
};
let engine = BasicDiffEngine::new();
let artifact = engine
.compare_snapshots(&left, &right)
.expect("diff compare should succeed");
assert_eq!(artifact.summary.added, 1);
assert_eq!(artifact.summary.removed, 1);
assert_eq!(artifact.summary.changed, 1);
assert_eq!(artifact.change_events.len(), 3);
}
#[test]
fn compare_snapshots_no_changes_is_deterministic_and_empty() {
let left = SnapshotState {
snapshot: DatasetSnapshot::new("parcels"),
features: vec![feature("parcels", "A", "v1")],
};
let right = SnapshotState {
snapshot: DatasetSnapshot::new("parcels"),
features: vec![feature("parcels", "A", "v1")],
};
let engine = BasicDiffEngine::new();
let artifact = engine
.compare_snapshots(&left, &right)
.expect("diff compare should succeed");
assert_eq!(artifact.summary.added, 0);
assert_eq!(artifact.summary.removed, 0);
assert_eq!(artifact.summary.changed, 0);
assert_eq!(artifact.change_events.len(), 0);
}
}