use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, PartialEq)]
pub struct ProvenancedTriple {
pub subject: String,
pub predicate: String,
pub object: String,
pub source_id: String,
pub confidence: f64,
pub timestamp: u64,
}
impl ProvenancedTriple {
pub fn new(
subject: impl Into<String>,
predicate: impl Into<String>,
object: impl Into<String>,
source_id: impl Into<String>,
confidence: f64,
) -> Self {
Self {
subject: subject.into(),
predicate: predicate.into(),
object: object.into(),
source_id: source_id.into(),
confidence: confidence.clamp(0.0, 1.0),
timestamp: 0,
}
}
pub fn with_timestamp(mut self, ts: u64) -> Self {
self.timestamp = ts;
self
}
pub fn triple_key(&self) -> (String, String, String) {
(
self.subject.clone(),
self.predicate.clone(),
self.object.clone(),
)
}
}
#[derive(Debug, Clone)]
pub struct SourceQuality {
pub source_id: String,
pub accuracy: f64,
pub completeness: f64,
pub timeliness: f64,
pub overall: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictStrategy {
Voting,
Recency,
Authority,
AverageConfidence,
}
#[derive(Debug, Clone)]
pub struct FusionConfig {
pub conflict_strategy: ConflictStrategy,
pub min_confidence: f64,
pub accuracy_weight: f64,
pub completeness_weight: f64,
pub timeliness_weight: f64,
pub entity_alignment_threshold: f64,
}
impl Default for FusionConfig {
fn default() -> Self {
Self {
conflict_strategy: ConflictStrategy::Voting,
min_confidence: 0.3,
accuracy_weight: 0.5,
completeness_weight: 0.3,
timeliness_weight: 0.2,
entity_alignment_threshold: 0.8,
}
}
}
#[derive(Debug, Clone)]
pub struct FusedTriple {
pub subject: String,
pub predicate: String,
pub object: String,
pub confidence: f64,
pub sources: Vec<String>,
pub support_count: usize,
}
#[derive(Debug, Clone)]
pub struct FusionStats {
pub input_triple_count: usize,
pub unique_triple_keys: usize,
pub output_triple_count: usize,
pub conflicts_resolved: usize,
pub source_count: usize,
pub mean_confidence: f64,
pub aligned_entity_pairs: usize,
}
#[derive(Debug, Clone)]
pub struct FusionResult {
pub triples: Vec<FusedTriple>,
pub stats: FusionStats,
pub provenance: HashMap<(String, String, String), Vec<String>>,
}
pub struct KnowledgeFusion {
config: FusionConfig,
sources: HashMap<String, SourceQuality>,
total_fusions: u64,
}
impl KnowledgeFusion {
pub fn new(config: FusionConfig) -> Self {
Self {
config,
sources: HashMap::new(),
total_fusions: 0,
}
}
pub fn register_source(
&mut self,
source_id: impl Into<String>,
accuracy: f64,
completeness: f64,
timeliness: f64,
) {
let source_id = source_id.into();
let overall = self.config.accuracy_weight * accuracy
+ self.config.completeness_weight * completeness
+ self.config.timeliness_weight * timeliness;
self.sources.insert(
source_id.clone(),
SourceQuality {
source_id,
accuracy: accuracy.clamp(0.0, 1.0),
completeness: completeness.clamp(0.0, 1.0),
timeliness: timeliness.clamp(0.0, 1.0),
overall: overall.clamp(0.0, 1.0),
},
);
}
pub fn fuse(&mut self, triples: &[ProvenancedTriple]) -> FusionResult {
let mut groups: HashMap<(String, String, String), Vec<&ProvenancedTriple>> = HashMap::new();
for t in triples {
groups.entry(t.triple_key()).or_default().push(t);
}
let unique_keys = groups.len();
let source_ids: HashSet<&str> = triples.iter().map(|t| t.source_id.as_str()).collect();
let source_count = source_ids.len();
let mut fused_triples: Vec<FusedTriple> = Vec::new();
let mut provenance_map: HashMap<(String, String, String), Vec<String>> = HashMap::new();
let mut conflicts_resolved = 0;
for (key, group) in &groups {
let sources_for_key: Vec<String> = group.iter().map(|t| t.source_id.clone()).collect();
provenance_map.insert(key.clone(), sources_for_key.clone());
let fused_confidence = self.resolve_confidence(group);
if fused_confidence >= self.config.min_confidence {
let support = group.len();
fused_triples.push(FusedTriple {
subject: key.0.clone(),
predicate: key.1.clone(),
object: key.2.clone(),
confidence: fused_confidence,
sources: sources_for_key,
support_count: support,
});
}
}
let mut sp_map: HashMap<(String, String), Vec<String>> = HashMap::new();
for ft in &fused_triples {
sp_map
.entry((ft.subject.clone(), ft.predicate.clone()))
.or_default()
.push(ft.object.clone());
}
let resolved_triples = fused_triples.clone();
for ((_s, _p), objects) in &sp_map {
if objects.len() > 1 {
conflicts_resolved += 1;
}
}
let mean_confidence = if resolved_triples.is_empty() {
0.0
} else {
resolved_triples.iter().map(|t| t.confidence).sum::<f64>()
/ resolved_triples.len() as f64
};
self.total_fusions += 1;
FusionResult {
triples: resolved_triples,
stats: FusionStats {
input_triple_count: triples.len(),
unique_triple_keys: unique_keys,
output_triple_count: fused_triples.len(),
conflicts_resolved,
source_count,
mean_confidence,
aligned_entity_pairs: 0,
},
provenance: provenance_map,
}
}
pub fn fuse_incremental(
&mut self,
existing: &[FusedTriple],
new_triples: &[ProvenancedTriple],
) -> FusionResult {
let mut all: Vec<ProvenancedTriple> = Vec::new();
for ft in existing {
let source_id = ft
.sources
.first()
.cloned()
.unwrap_or_else(|| "unknown".to_string());
all.push(ProvenancedTriple::new(
&ft.subject,
&ft.predicate,
&ft.object,
source_id,
ft.confidence,
));
}
all.extend(new_triples.iter().cloned());
self.fuse(&all)
}
pub fn align_entities(&self, triples: &[ProvenancedTriple]) -> Vec<(String, String)> {
let mut entities_by_source: HashMap<&str, HashSet<&str>> = HashMap::new();
for t in triples {
entities_by_source
.entry(&t.source_id)
.or_default()
.insert(&t.subject);
}
let source_ids: Vec<&&str> = entities_by_source.keys().collect();
let mut alignments: Vec<(String, String)> = Vec::new();
for i in 0..source_ids.len() {
for j in (i + 1)..source_ids.len() {
let entities_a = &entities_by_source[source_ids[i]];
let entities_b = &entities_by_source[source_ids[j]];
for &ea in entities_a {
for &eb in entities_b {
let sim = normalized_levenshtein(ea, eb);
if sim >= self.config.entity_alignment_threshold && ea != eb {
alignments.push((ea.to_string(), eb.to_string()));
}
}
}
}
}
alignments
}
pub fn source_quality(&self, source_id: &str) -> Option<&SourceQuality> {
self.sources.get(source_id)
}
pub fn total_fusions(&self) -> u64 {
self.total_fusions
}
pub fn registered_source_count(&self) -> usize {
self.sources.len()
}
fn resolve_confidence(&self, group: &[&ProvenancedTriple]) -> f64 {
match self.config.conflict_strategy {
ConflictStrategy::Voting => {
let max_possible = self.sources.len().max(group.len()) as f64;
if max_possible == 0.0 {
group.iter().map(|t| t.confidence).sum::<f64>() / group.len().max(1) as f64
} else {
group.len() as f64 / max_possible
}
}
ConflictStrategy::Recency => {
group
.iter()
.max_by_key(|t| t.timestamp)
.map(|t| t.confidence)
.unwrap_or(0.0)
}
ConflictStrategy::Authority => {
group
.iter()
.filter_map(|t| {
self.sources
.get(&t.source_id)
.map(|sq| sq.overall * t.confidence)
})
.fold(0.0_f64, f64::max)
.max(
group.iter().map(|t| t.confidence).fold(0.0_f64, f64::max),
)
}
ConflictStrategy::AverageConfidence => {
let sum: f64 = group.iter().map(|t| t.confidence).sum();
sum / group.len().max(1) as f64
}
}
}
}
fn normalized_levenshtein(a: &str, b: &str) -> f64 {
if a == b {
return 1.0;
}
let max_len = a.len().max(b.len());
if max_len == 0 {
return 1.0;
}
let dist = levenshtein_distance(a, b);
1.0 - (dist as f64 / max_len as f64)
}
fn levenshtein_distance(a: &str, b: &str) -> usize {
let a_chars: Vec<char> = a.chars().collect();
let b_chars: Vec<char> = b.chars().collect();
let m = a_chars.len();
let n = b_chars.len();
let mut prev = (0..=n).collect::<Vec<usize>>();
let mut curr = vec![0; n + 1];
for i in 1..=m {
curr[0] = i;
for j in 1..=n {
let cost = if a_chars[i - 1] == b_chars[j - 1] {
0
} else {
1
};
curr[j] = (prev[j] + 1).min(curr[j - 1] + 1).min(prev[j - 1] + cost);
}
std::mem::swap(&mut prev, &mut curr);
}
prev[n]
}
#[cfg(test)]
mod tests {
use super::*;
fn default_fusion() -> KnowledgeFusion {
KnowledgeFusion::new(FusionConfig::default())
}
fn sample_triples() -> Vec<ProvenancedTriple> {
vec![
ProvenancedTriple::new("Alice", "knows", "Bob", "src1", 0.9),
ProvenancedTriple::new("Alice", "knows", "Bob", "src2", 0.8),
ProvenancedTriple::new("Bob", "likes", "Music", "src1", 0.7),
]
}
#[test]
fn test_provenanced_triple_creation() {
let t = ProvenancedTriple::new("A", "B", "C", "src", 0.5);
assert_eq!(t.subject, "A");
assert_eq!(t.predicate, "B");
assert_eq!(t.object, "C");
assert_eq!(t.source_id, "src");
assert!((t.confidence - 0.5).abs() < 1e-10);
}
#[test]
fn test_confidence_clamped() {
let t = ProvenancedTriple::new("A", "B", "C", "src", 1.5);
assert!((t.confidence - 1.0).abs() < 1e-10);
}
#[test]
fn test_triple_key() {
let t = ProvenancedTriple::new("A", "B", "C", "src", 0.5);
assert_eq!(
t.triple_key(),
("A".to_string(), "B".to_string(), "C".to_string())
);
}
#[test]
fn test_with_timestamp() {
let t = ProvenancedTriple::new("A", "B", "C", "src", 0.5).with_timestamp(1000);
assert_eq!(t.timestamp, 1000);
}
#[test]
fn test_register_source() {
let mut f = default_fusion();
f.register_source("src1", 0.9, 0.8, 0.7);
assert_eq!(f.registered_source_count(), 1);
}
#[test]
fn test_source_quality_retrieval() {
let mut f = default_fusion();
f.register_source("src1", 0.9, 0.8, 0.7);
let q = f.source_quality("src1").expect("should exist");
assert!((q.accuracy - 0.9).abs() < 1e-10);
assert!((q.completeness - 0.8).abs() < 1e-10);
}
#[test]
fn test_source_quality_overall() {
let mut f = default_fusion();
f.register_source("src1", 1.0, 1.0, 1.0);
let q = f.source_quality("src1").expect("should exist");
assert!((q.overall - 1.0).abs() < 1e-10);
}
#[test]
fn test_unknown_source_returns_none() {
let f = default_fusion();
assert!(f.source_quality("nonexistent").is_none());
}
#[test]
fn test_fuse_deduplicates() {
let mut f = default_fusion();
let triples = sample_triples();
let result = f.fuse(&triples);
assert_eq!(result.stats.unique_triple_keys, 2);
}
#[test]
fn test_fuse_input_count() {
let mut f = default_fusion();
let result = f.fuse(&sample_triples());
assert_eq!(result.stats.input_triple_count, 3);
}
#[test]
fn test_fuse_source_count() {
let mut f = default_fusion();
let result = f.fuse(&sample_triples());
assert_eq!(result.stats.source_count, 2); }
#[test]
fn test_fused_triple_has_support_count() {
let mut f = default_fusion();
let result = f.fuse(&sample_triples());
let alice_bob = result
.triples
.iter()
.find(|t| t.subject == "Alice" && t.object == "Bob");
assert!(alice_bob.is_some());
assert_eq!(alice_bob.map(|t| t.support_count).unwrap_or(0), 2);
}
#[test]
fn test_fused_triple_has_sources() {
let mut f = default_fusion();
let result = f.fuse(&sample_triples());
let alice_bob = result
.triples
.iter()
.find(|t| t.subject == "Alice" && t.object == "Bob")
.expect("should find fused triple");
assert!(alice_bob.sources.contains(&"src1".to_string()));
assert!(alice_bob.sources.contains(&"src2".to_string()));
}
#[test]
fn test_provenance_map_populated() {
let mut f = default_fusion();
let result = f.fuse(&sample_triples());
let key = ("Alice".to_string(), "knows".to_string(), "Bob".to_string());
let sources = result.provenance.get(&key).expect("should have provenance");
assert_eq!(sources.len(), 2);
}
#[test]
fn test_voting_strategy() {
let mut f = KnowledgeFusion::new(FusionConfig {
conflict_strategy: ConflictStrategy::Voting,
min_confidence: 0.0,
..FusionConfig::default()
});
let result = f.fuse(&sample_triples());
assert!(!result.triples.is_empty());
}
#[test]
fn test_recency_strategy() {
let mut f = KnowledgeFusion::new(FusionConfig {
conflict_strategy: ConflictStrategy::Recency,
min_confidence: 0.0,
..FusionConfig::default()
});
let triples = vec![
ProvenancedTriple::new("A", "p", "B", "s1", 0.5).with_timestamp(100),
ProvenancedTriple::new("A", "p", "B", "s2", 0.9).with_timestamp(200),
];
let result = f.fuse(&triples);
let fused = &result.triples[0];
assert!((fused.confidence - 0.9).abs() < 1e-10);
}
#[test]
fn test_authority_strategy() {
let mut f = KnowledgeFusion::new(FusionConfig {
conflict_strategy: ConflictStrategy::Authority,
min_confidence: 0.0,
..FusionConfig::default()
});
f.register_source("high_quality", 1.0, 1.0, 1.0);
f.register_source("low_quality", 0.1, 0.1, 0.1);
let triples = vec![
ProvenancedTriple::new("A", "p", "B", "high_quality", 0.8),
ProvenancedTriple::new("A", "p", "B", "low_quality", 0.8),
];
let result = f.fuse(&triples);
assert!(!result.triples.is_empty());
assert!(result.triples[0].confidence > 0.0);
}
#[test]
fn test_average_confidence_strategy() {
let mut f = KnowledgeFusion::new(FusionConfig {
conflict_strategy: ConflictStrategy::AverageConfidence,
min_confidence: 0.0,
..FusionConfig::default()
});
let triples = vec![
ProvenancedTriple::new("A", "p", "B", "s1", 0.6),
ProvenancedTriple::new("A", "p", "B", "s2", 0.8),
];
let result = f.fuse(&triples);
assert!((result.triples[0].confidence - 0.7).abs() < 1e-10);
}
#[test]
fn test_min_confidence_filters_low() {
let mut f = KnowledgeFusion::new(FusionConfig {
conflict_strategy: ConflictStrategy::AverageConfidence,
min_confidence: 0.8,
..FusionConfig::default()
});
let triples = vec![ProvenancedTriple::new("A", "p", "B", "s1", 0.3)];
let result = f.fuse(&triples);
assert!(
result.triples.is_empty(),
"low confidence should be filtered"
);
}
#[test]
fn test_incremental_fusion() {
let mut f = KnowledgeFusion::new(FusionConfig {
conflict_strategy: ConflictStrategy::AverageConfidence,
min_confidence: 0.0,
..FusionConfig::default()
});
let existing = vec![FusedTriple {
subject: "A".into(),
predicate: "p".into(),
object: "B".into(),
confidence: 0.8,
sources: vec!["s1".into()],
support_count: 1,
}];
let new_triples = vec![
ProvenancedTriple::new("A", "p", "B", "s2", 0.9),
ProvenancedTriple::new("C", "q", "D", "s2", 0.7),
];
let result = f.fuse_incremental(&existing, &new_triples);
assert!(result.triples.len() >= 2, "should have at least 2 triples");
}
#[test]
fn test_incremental_increases_support() {
let mut f = KnowledgeFusion::new(FusionConfig {
conflict_strategy: ConflictStrategy::AverageConfidence,
min_confidence: 0.0,
..FusionConfig::default()
});
let existing = vec![FusedTriple {
subject: "A".into(),
predicate: "p".into(),
object: "B".into(),
confidence: 0.8,
sources: vec!["s1".into()],
support_count: 1,
}];
let new = vec![ProvenancedTriple::new("A", "p", "B", "s2", 0.9)];
let result = f.fuse_incremental(&existing, &new);
let ab = result
.triples
.iter()
.find(|t| t.subject == "A" && t.object == "B")
.expect("should exist");
assert_eq!(ab.support_count, 2);
}
#[test]
fn test_align_entities_similar_names() {
let f = KnowledgeFusion::new(FusionConfig {
entity_alignment_threshold: 0.8,
..FusionConfig::default()
});
let triples = vec![
ProvenancedTriple::new("Alice_Smith", "knows", "Bob", "s1", 0.9),
ProvenancedTriple::new("Alice_Smit", "knows", "Carol", "s2", 0.8),
];
let alignments = f.align_entities(&triples);
assert!(!alignments.is_empty(), "should detect similar entity names");
}
#[test]
fn test_align_entities_exact_same_not_aligned() {
let f = default_fusion();
let triples = vec![
ProvenancedTriple::new("Alice", "knows", "Bob", "s1", 0.9),
ProvenancedTriple::new("Alice", "likes", "Carol", "s2", 0.8),
];
let alignments = f.align_entities(&triples);
assert!(
alignments.is_empty(),
"exact same names should not produce alignment"
);
}
#[test]
fn test_align_entities_completely_different() {
let f = default_fusion();
let triples = vec![
ProvenancedTriple::new("Alice", "knows", "Bob", "s1", 0.9),
ProvenancedTriple::new("Xyz123", "likes", "Carol", "s2", 0.8),
];
let alignments = f.align_entities(&triples);
assert!(alignments.is_empty());
}
#[test]
fn test_total_fusions_initially_zero() {
let f = default_fusion();
assert_eq!(f.total_fusions(), 0);
}
#[test]
fn test_total_fusions_increments() {
let mut f = default_fusion();
f.fuse(&sample_triples());
f.fuse(&sample_triples());
assert_eq!(f.total_fusions(), 2);
}
#[test]
fn test_fuse_empty() {
let mut f = default_fusion();
let result = f.fuse(&[]);
assert!(result.triples.is_empty());
assert_eq!(result.stats.input_triple_count, 0);
}
#[test]
fn test_config_default_values() {
let config = FusionConfig::default();
assert_eq!(config.conflict_strategy, ConflictStrategy::Voting);
assert!((config.min_confidence - 0.3).abs() < 1e-10);
assert!((config.entity_alignment_threshold - 0.8).abs() < 1e-10);
}
#[test]
fn test_fusion_stats_mean_confidence() {
let mut f = KnowledgeFusion::new(FusionConfig {
conflict_strategy: ConflictStrategy::AverageConfidence,
min_confidence: 0.0,
..FusionConfig::default()
});
let triples = vec![
ProvenancedTriple::new("A", "p", "B", "s1", 0.6),
ProvenancedTriple::new("C", "q", "D", "s1", 0.8),
];
let result = f.fuse(&triples);
assert!(result.stats.mean_confidence > 0.0);
}
#[test]
fn test_single_source_fusion() {
let mut f = KnowledgeFusion::new(FusionConfig {
conflict_strategy: ConflictStrategy::AverageConfidence,
min_confidence: 0.0,
..FusionConfig::default()
});
let triples = vec![ProvenancedTriple::new("A", "p", "B", "s1", 0.9)];
let result = f.fuse(&triples);
assert_eq!(result.triples.len(), 1);
assert_eq!(result.stats.source_count, 1);
}
}