use std::collections::{HashMap, HashSet};
use super::adapter::RdfGraphAdapter;
use super::centrality::{BetweennessCentrality, DegreeCentrality, PageRank};
use super::components::ConnectedComponents;
#[derive(Debug, Clone)]
pub enum DegreeDirection {
Incoming,
Outgoing,
Both,
}
#[derive(Debug, Clone)]
pub enum GraphAnalyticsAggregate {
PageRank {
damping: f64,
iterations: usize,
},
BetweennessCentrality {
normalized: bool,
},
ConnectedComponent,
ClusteringCoefficient,
DegreeCentrality {
direction: DegreeDirection,
},
}
#[derive(Debug)]
pub enum GraphAnalyticsError {
EmptyGraph,
AlgorithmError(String),
NodeNotFound(String),
}
impl std::fmt::Display for GraphAnalyticsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::EmptyGraph => f.write_str("graph analytics: accumulated edge set is empty"),
Self::AlgorithmError(msg) => write!(f, "graph analytics algorithm error: {msg}"),
Self::NodeNotFound(iri) => write!(f, "graph analytics: node not found: {iri}"),
}
}
}
impl std::error::Error for GraphAnalyticsError {}
#[inline]
fn is_iri_like(s: &str) -> bool {
!s.starts_with('"') && s.contains(':')
}
pub struct GraphAnalyticsAccumulator {
edges: Vec<(String, String)>,
node_set: HashSet<String>,
kind: GraphAnalyticsAggregate,
}
impl GraphAnalyticsAccumulator {
pub fn new(kind: GraphAnalyticsAggregate) -> Self {
Self {
edges: Vec::new(),
node_set: HashSet::new(),
kind,
}
}
pub fn accumulate(&mut self, subject: &str, object: &str) {
if !is_iri_like(subject) || !is_iri_like(object) {
return; }
self.node_set.insert(subject.to_string());
self.node_set.insert(object.to_string());
self.edges.push((subject.to_string(), object.to_string()));
}
pub fn edge_count(&self) -> usize {
self.edges.len()
}
pub fn node_count(&self) -> usize {
self.node_set.len()
}
pub fn finalize(&mut self) -> Result<HashMap<String, f64>, GraphAnalyticsError> {
if self.edges.is_empty() {
return Err(GraphAnalyticsError::EmptyGraph);
}
let triples: Vec<(String, String, String)> = self
.edges
.iter()
.map(|(s, o)| (s.clone(), "ex:aggregateEdge".to_string(), o.clone()))
.collect();
let graph = RdfGraphAdapter::from_triples(&triples);
match &self.kind.clone() {
GraphAnalyticsAggregate::PageRank {
damping,
iterations,
} => {
let pr = PageRank::new()
.with_damping(*damping)
.with_max_iter(*iterations);
let raw = pr.compute(&graph);
Ok(translate_scores(&graph, &raw))
}
GraphAnalyticsAggregate::BetweennessCentrality { normalized } => {
let bc = BetweennessCentrality {
normalized: *normalized,
};
let raw = bc.compute(&graph);
Ok(translate_scores(&graph, &raw))
}
GraphAnalyticsAggregate::ConnectedComponent => {
let components = ConnectedComponents::weakly_connected(&graph);
let mut result: HashMap<String, f64> = HashMap::new();
for (component_id, component) in components.iter().enumerate() {
for &node_id in component {
if let Some(iri) = graph.get_node_iri(node_id) {
result.insert(iri.to_string(), component_id as f64);
}
}
}
Ok(result)
}
GraphAnalyticsAggregate::ClusteringCoefficient => {
Ok(compute_clustering_coefficient(&graph))
}
GraphAnalyticsAggregate::DegreeCentrality { direction } => {
let raw = match direction {
DegreeDirection::Incoming => DegreeCentrality::in_degree(&graph),
DegreeDirection::Outgoing => DegreeCentrality::out_degree(&graph),
DegreeDirection::Both => DegreeCentrality::total_degree(&graph),
};
Ok(translate_scores(&graph, &raw))
}
}
}
pub fn finalize_for_node(&mut self, node: &str) -> Result<Option<f64>, GraphAnalyticsError> {
let scores = self.finalize()?;
Ok(scores.get(node).copied())
}
}
fn translate_scores(graph: &RdfGraphAdapter, raw: &HashMap<usize, f64>) -> HashMap<String, f64> {
raw.iter()
.filter_map(|(&id, &score)| graph.get_node_iri(id).map(|iri| (iri.to_string(), score)))
.collect()
}
fn compute_clustering_coefficient(graph: &RdfGraphAdapter) -> HashMap<String, f64> {
let n = graph.node_count();
let mut result = HashMap::with_capacity(n);
for v in 0..n {
let iri = match graph.get_node_iri(v) {
Some(s) => s.to_string(),
None => continue,
};
let mut neighbour_set: HashSet<usize> = HashSet::new();
for &(u, _) in &graph.adjacency[v] {
if u != v {
neighbour_set.insert(u);
}
}
for &(u, _) in &graph.reverse_adjacency[v] {
if u != v {
neighbour_set.insert(u);
}
}
let k = neighbour_set.len();
if k < 2 {
result.insert(iri, 0.0);
continue;
}
let neighbours: Vec<usize> = neighbour_set.into_iter().collect();
let mut triangle_edges: usize = 0;
for i in 0..neighbours.len() {
let u = neighbours[i];
let u_out: HashSet<usize> = graph.adjacency[u].iter().map(|&(w, _)| w).collect();
let u_in: HashSet<usize> = graph.reverse_adjacency[u].iter().map(|&(w, _)| w).collect();
for &w in neighbours.iter().skip(i + 1) {
if u_out.contains(&w) || u_in.contains(&w) {
triangle_edges += 1;
}
}
}
let max_edges = (k * (k - 1)) / 2;
let coeff = if max_edges == 0 {
0.0
} else {
triangle_edges as f64 / max_edges as f64
};
result.insert(iri, coeff);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
fn make_acc(kind: GraphAnalyticsAggregate) -> GraphAnalyticsAccumulator {
GraphAnalyticsAccumulator::new(kind)
}
#[test]
fn test_accumulate_tracks_edge_count() {
let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
direction: DegreeDirection::Both,
});
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:B", "ex:C");
assert_eq!(acc.edge_count(), 2);
assert_eq!(acc.node_count(), 3);
}
#[test]
fn test_accumulate_drops_literals() {
let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
direction: DegreeDirection::Both,
});
acc.accumulate("ex:A", "\"Alice\"");
acc.accumulate("plainstring", "ex:B");
acc.accumulate("ex:A", "ex:B");
assert_eq!(acc.edge_count(), 1);
assert_eq!(acc.node_count(), 2);
}
#[test]
fn test_finalize_for_node_existing() {
let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
direction: DegreeDirection::Outgoing,
});
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:A", "ex:C");
let score = acc
.finalize_for_node("ex:A")
.expect("finalize should succeed")
.expect("ex:A should have a score");
assert!((score - 1.0).abs() < 1e-9, "score={score}");
}
#[test]
fn test_finalize_for_node_missing_returns_none() {
let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
direction: DegreeDirection::Both,
});
acc.accumulate("ex:A", "ex:B");
let result = acc
.finalize_for_node("ex:Z")
.expect("finalize should succeed");
assert!(result.is_none(), "ex:Z was never accumulated");
}
#[test]
fn test_pagerank_4node_cycle_converges() {
let mut acc = make_acc(GraphAnalyticsAggregate::PageRank {
damping: 0.85,
iterations: 100,
});
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:B", "ex:C");
acc.accumulate("ex:C", "ex:D");
acc.accumulate("ex:D", "ex:A");
let scores = acc.finalize().expect("PageRank should succeed");
assert_eq!(scores.len(), 4, "4 distinct nodes expected");
let expected = 1.0 / 4.0;
for (node, &score) in &scores {
assert!(
(score - expected).abs() < 1e-4,
"node {node}: expected ~{expected}, got {score}"
);
}
let total: f64 = scores.values().sum();
assert!((total - 1.0).abs() < 1e-4, "sum={total}");
}
#[test]
fn test_pagerank_star_center_highest() {
let mut acc = make_acc(GraphAnalyticsAggregate::PageRank {
damping: 0.85,
iterations: 100,
});
acc.accumulate("ex:S1", "ex:Hub");
acc.accumulate("ex:S2", "ex:Hub");
acc.accumulate("ex:S3", "ex:Hub");
acc.accumulate("ex:Hub", "ex:S1");
let scores = acc.finalize().expect("PageRank should succeed");
let hub_score = scores["ex:Hub"];
let s2_score = scores["ex:S2"];
assert!(
hub_score > s2_score,
"hub ({hub_score}) should outrank spoke ({s2_score})"
);
}
#[test]
fn test_pagerank_empty_graph_error() {
let mut acc = make_acc(GraphAnalyticsAggregate::PageRank {
damping: 0.85,
iterations: 100,
});
let err = acc.finalize().expect_err("empty graph should fail");
assert!(
matches!(err, GraphAnalyticsError::EmptyGraph),
"expected EmptyGraph, got {err}"
);
}
#[test]
fn test_pagerank_damping_affects_rank() {
let edges = [("ex:A", "ex:B"), ("ex:B", "ex:C"), ("ex:C", "ex:A")];
let mut acc1 = make_acc(GraphAnalyticsAggregate::PageRank {
damping: 0.5,
iterations: 200,
});
let mut acc2 = make_acc(GraphAnalyticsAggregate::PageRank {
damping: 0.95,
iterations: 200,
});
for (s, o) in &edges {
acc1.accumulate(s, o);
acc2.accumulate(s, o);
}
let s1 = acc1.finalize().expect("should succeed");
let s2 = acc2.finalize().expect("should succeed");
let sum1: f64 = s1.values().sum();
let sum2: f64 = s2.values().sum();
assert!((sum1 - 1.0).abs() < 1e-4, "d=0.5 sum={sum1}");
assert!((sum2 - 1.0).abs() < 1e-4, "d=0.95 sum={sum2}");
let _ = (sum1, sum2);
}
#[test]
fn test_betweenness_bridge_node_highest() {
let mut acc = make_acc(GraphAnalyticsAggregate::BetweennessCentrality { normalized: true });
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:B", "ex:C");
let scores = acc.finalize().expect("should succeed");
let b_score = scores["ex:B"];
let a_score = scores["ex:A"];
let c_score = scores["ex:C"];
assert!(
b_score > a_score,
"bridge B ({b_score}) should beat A ({a_score})"
);
assert!(
b_score > c_score,
"bridge B ({b_score}) should beat C ({c_score})"
);
}
#[test]
fn test_betweenness_complete_graph_equal() {
let mut acc = make_acc(GraphAnalyticsAggregate::BetweennessCentrality { normalized: true });
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:A", "ex:C");
acc.accumulate("ex:B", "ex:A");
acc.accumulate("ex:B", "ex:C");
acc.accumulate("ex:C", "ex:A");
acc.accumulate("ex:C", "ex:B");
let scores = acc.finalize().expect("should succeed");
let vals: Vec<f64> = scores.values().copied().collect();
let first = vals[0];
for &v in &vals {
assert!(
(v - first).abs() < 1e-9,
"complete graph: not equal {v} vs {first}"
);
}
}
#[test]
fn test_betweenness_normalized_in_range() {
let mut acc = make_acc(GraphAnalyticsAggregate::BetweennessCentrality { normalized: true });
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:B", "ex:C");
acc.accumulate("ex:C", "ex:D");
acc.accumulate("ex:D", "ex:E");
let scores = acc.finalize().expect("should succeed");
for (node, &score) in &scores {
assert!(
(0.0..=1.0).contains(&score),
"node {node} score {score} out of [0,1]"
);
}
}
#[test]
fn test_connected_component_single_component() {
let mut acc = make_acc(GraphAnalyticsAggregate::ConnectedComponent);
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:B", "ex:C");
let scores = acc.finalize().expect("should succeed");
for &id in scores.values() {
assert_eq!(id as usize, 0, "all nodes in component 0");
}
assert_eq!(scores.len(), 3);
}
#[test]
fn test_connected_component_two_components() {
let mut acc = make_acc(GraphAnalyticsAggregate::ConnectedComponent);
acc.accumulate("ex:A", "ex:B"); acc.accumulate("ex:C", "ex:D");
let scores = acc.finalize().expect("should succeed");
let ids: HashSet<usize> = scores.values().map(|&v| v as usize).collect();
assert_eq!(ids.len(), 2, "exactly 2 distinct component IDs expected");
}
#[test]
fn test_connected_component_isolated_node() {
let mut acc = make_acc(GraphAnalyticsAggregate::ConnectedComponent);
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:C", "ex:D");
let scores = acc.finalize().expect("should succeed");
let ids: HashSet<usize> = scores.values().map(|&v| v as usize).collect();
assert_eq!(ids.len(), 2);
}
#[test]
fn test_clustering_complete_graph_is_one() {
let mut acc = make_acc(GraphAnalyticsAggregate::ClusteringCoefficient);
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:B", "ex:C");
acc.accumulate("ex:A", "ex:C");
let scores = acc.finalize().expect("should succeed");
for (node, &coeff) in &scores {
assert!(
(coeff - 1.0).abs() < 1e-9,
"K3 node {node}: expected 1.0, got {coeff}"
);
}
}
#[test]
fn test_clustering_star_is_zero() {
let mut acc = make_acc(GraphAnalyticsAggregate::ClusteringCoefficient);
acc.accumulate("ex:Hub", "ex:S1");
acc.accumulate("ex:Hub", "ex:S2");
acc.accumulate("ex:Hub", "ex:S3");
let scores = acc.finalize().expect("should succeed");
for spoke in &["ex:S1", "ex:S2", "ex:S3"] {
let coeff = scores[*spoke];
assert!(coeff < 1e-9, "star leaf {spoke}: expected 0.0, got {coeff}");
}
}
#[test]
fn test_degree_outgoing_count() {
let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
direction: DegreeDirection::Outgoing,
});
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:A", "ex:C");
acc.accumulate("ex:B", "ex:C");
let scores = acc.finalize().expect("should succeed");
let a = scores["ex:A"];
let b = scores["ex:B"];
let c = scores["ex:C"];
assert!((a - 1.0).abs() < 1e-9, "A out={a}");
assert!((b - 0.5).abs() < 1e-9, "B out={b}");
assert!((c - 0.0).abs() < 1e-9, "C out={c}");
}
#[test]
fn test_degree_incoming_count() {
let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
direction: DegreeDirection::Incoming,
});
acc.accumulate("ex:A", "ex:C");
acc.accumulate("ex:B", "ex:C");
acc.accumulate("ex:A", "ex:B");
let scores = acc.finalize().expect("should succeed");
let c_score = scores["ex:C"];
let a_score = scores["ex:A"];
assert!(
c_score > a_score,
"C in={c_score} should exceed A in={a_score}"
);
}
#[test]
fn test_degree_both_sum() {
let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
direction: DegreeDirection::Both,
});
acc.accumulate("ex:A", "ex:B");
acc.accumulate("ex:B", "ex:C");
let scores = acc.finalize().expect("should succeed");
let b = scores["ex:B"];
let a = scores["ex:A"];
let c = scores["ex:C"];
assert!(b > a, "B total={b} should exceed A total={a}");
assert!(b > c, "B total={b} should exceed C total={c}");
assert!((a - c).abs() < 1e-9, "A and C should be equal: a={a} c={c}");
}
}