Skip to main content

oxirs_arq/analytics/
graph_analytics_agg.rs

1//! Graph analytics aggregate: collect edges during a GROUP BY pass, then
2//! finalize to per-node topology scores via the existing analytics back-ends.
3//!
4//! # Overview
5//!
6//! [`GraphAnalyticsAccumulator`] acts as the stateful context for a
7//! SPARQL-style custom aggregate function.  The typical lifecycle is:
8//!
9//! 1. Create an accumulator with the desired [`GraphAnalyticsAggregate`] kind.
10//! 2. For every row in the current group, call [`GraphAnalyticsAccumulator::accumulate`]
11//!    with the `(subject, object)` IRI pair that forms a directed edge.
12//! 3. Call [`GraphAnalyticsAccumulator::finalize`] to run the analytics
13//!    algorithm over the collected edge set and receive a
14//!    `HashMap<node_iri, score>`.
15//!
16//! # Edge-set constraints
17//!
18//! The underlying [`super::adapter::RdfGraphAdapter`] silently drops edges
19//! whose object looks like an RDF literal (starts with `"` or lacks `:`).
20//! [`GraphAnalyticsAccumulator::accumulate`] enforces the same filter up front
21//! so that [`edge_count`] / `node_count` agree with what `finalize` sees.
22//!
23//! [`edge_count`]: GraphAnalyticsAccumulator::edge_count
24
25use std::collections::{HashMap, HashSet};
26
27use super::adapter::RdfGraphAdapter;
28use super::centrality::{BetweennessCentrality, DegreeCentrality, PageRank};
29use super::components::ConnectedComponents;
30
31// ── Direction selector for degree centrality ──────────────────────────────────
32
33/// Direction used by the [`GraphAnalyticsAggregate::DegreeCentrality`] variant.
34#[derive(Debug, Clone)]
35pub enum DegreeDirection {
36    /// Count only incoming edges.
37    Incoming,
38    /// Count only outgoing edges.
39    Outgoing,
40    /// Count both (total degree).
41    Both,
42}
43
44// ── Aggregate variant ─────────────────────────────────────────────────────────
45
46/// Which graph-analytics metric to compute at finalization time.
47///
48/// All variants operate over the directed edge set accumulated by
49/// [`GraphAnalyticsAccumulator`].
50#[derive(Debug, Clone)]
51pub enum GraphAnalyticsAggregate {
52    /// PageRank with configurable damping factor and iteration cap.
53    ///
54    /// `damping` is clamped to `[0.0, 1.0]`. `iterations` is the maximum
55    /// number of power-iteration steps (convergence can terminate earlier).
56    PageRank {
57        /// Damping factor *d* in the PageRank formula (typical default: 0.85).
58        damping: f64,
59        /// Maximum number of power-iteration steps.
60        iterations: usize,
61    },
62    /// Betweenness centrality via Brandes' O(VE) BFS algorithm.
63    BetweennessCentrality {
64        /// When `true` scores are divided by `(n-1)(n-2)` for directed graphs.
65        normalized: bool,
66    },
67    /// Weakly-connected component ID for each node.
68    ///
69    /// IDs are 0-based integers converted to `f64`, stable within a single
70    /// `finalize()` call but not across separate calls.
71    ConnectedComponent,
72    /// Local clustering coefficient for each node.
73    ///
74    /// Treats the graph as *undirected* (unions out- and in-neighbours).
75    /// For a node with fewer than two neighbours the coefficient is `0.0`.
76    ClusteringCoefficient,
77    /// Degree centrality — normalised edge count per node.
78    DegreeCentrality {
79        /// Which edge direction to count.
80        direction: DegreeDirection,
81    },
82}
83
84// ── Error type ────────────────────────────────────────────────────────────────
85
86/// Errors that can occur during analytics finalization.
87#[derive(Debug)]
88pub enum GraphAnalyticsError {
89    /// The accumulated edge set is empty — no nodes, no topology.
90    EmptyGraph,
91    /// The chosen algorithm encountered an internal error.
92    AlgorithmError(String),
93    /// A specific node IRI was not found in the accumulated graph.
94    NodeNotFound(String),
95}
96
97impl std::fmt::Display for GraphAnalyticsError {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        match self {
100            Self::EmptyGraph => f.write_str("graph analytics: accumulated edge set is empty"),
101            Self::AlgorithmError(msg) => write!(f, "graph analytics algorithm error: {msg}"),
102            Self::NodeNotFound(iri) => write!(f, "graph analytics: node not found: {iri}"),
103        }
104    }
105}
106
107impl std::error::Error for GraphAnalyticsError {}
108
109// ── Accumulator ───────────────────────────────────────────────────────────────
110
111/// Returns `true` when `s` looks like an IRI (not a literal).
112///
113/// Matches the heuristic used by [`RdfGraphAdapter`]: a string is treated as
114/// a node IRI if it does **not** start with `"` and contains at least one `:`.
115#[inline]
116fn is_iri_like(s: &str) -> bool {
117    !s.starts_with('"') && s.contains(':')
118}
119
120/// Stateful accumulator that collects directed edges during a GROUP BY pass and
121/// runs a chosen graph-analytics algorithm at finalization time.
122///
123/// # Example
124///
125/// ```rust
126/// use oxirs_arq::analytics::graph_analytics_agg::{
127///     GraphAnalyticsAccumulator, GraphAnalyticsAggregate,
128/// };
129///
130/// let mut acc = GraphAnalyticsAccumulator::new(GraphAnalyticsAggregate::PageRank {
131///     damping: 0.85,
132///     iterations: 50,
133/// });
134/// acc.accumulate("ex:A", "ex:B");
135/// acc.accumulate("ex:B", "ex:C");
136/// acc.accumulate("ex:C", "ex:A");
137///
138/// let scores = acc.finalize().expect("PageRank should succeed");
139/// assert_eq!(scores.len(), 3);
140/// ```
141pub struct GraphAnalyticsAccumulator {
142    /// Accumulated directed edges `(subject_iri, object_iri)`.
143    edges: Vec<(String, String)>,
144    /// Distinct node IRIs seen (only IRI-like strings).
145    node_set: HashSet<String>,
146    /// Which analytics function to run at finalization.
147    kind: GraphAnalyticsAggregate,
148}
149
150impl GraphAnalyticsAccumulator {
151    /// Create a new accumulator for the given analytics kind.
152    pub fn new(kind: GraphAnalyticsAggregate) -> Self {
153        Self {
154            edges: Vec::new(),
155            node_set: HashSet::new(),
156            kind,
157        }
158    }
159
160    /// Accumulate one directed edge `(subject, object)`.
161    ///
162    /// Edges where *either* endpoint is not IRI-like are silently discarded so
163    /// that `edge_count()` / `node_count()` agree with what `finalize()` sees.
164    pub fn accumulate(&mut self, subject: &str, object: &str) {
165        if !is_iri_like(subject) || !is_iri_like(object) {
166            return; // drop literal-shaped endpoints
167        }
168        self.node_set.insert(subject.to_string());
169        self.node_set.insert(object.to_string());
170        self.edges.push((subject.to_string(), object.to_string()));
171    }
172
173    /// Number of valid (IRI-like) edges accumulated so far.
174    pub fn edge_count(&self) -> usize {
175        self.edges.len()
176    }
177
178    /// Number of distinct IRI nodes seen so far.
179    pub fn node_count(&self) -> usize {
180        self.node_set.len()
181    }
182
183    /// Run the analytics algorithm over the accumulated edge set.
184    ///
185    /// Returns a `HashMap` mapping each node IRI to its numeric score.
186    ///
187    /// # Errors
188    ///
189    /// Returns [`GraphAnalyticsError::EmptyGraph`] when no IRI-like edges have
190    /// been accumulated.
191    pub fn finalize(&mut self) -> Result<HashMap<String, f64>, GraphAnalyticsError> {
192        if self.edges.is_empty() {
193            return Err(GraphAnalyticsError::EmptyGraph);
194        }
195
196        // Build the adapter from accumulated edges using a synthetic predicate.
197        let triples: Vec<(String, String, String)> = self
198            .edges
199            .iter()
200            .map(|(s, o)| (s.clone(), "ex:aggregateEdge".to_string(), o.clone()))
201            .collect();
202        let graph = RdfGraphAdapter::from_triples(&triples);
203
204        match &self.kind.clone() {
205            GraphAnalyticsAggregate::PageRank {
206                damping,
207                iterations,
208            } => {
209                let pr = PageRank::new()
210                    .with_damping(*damping)
211                    .with_max_iter(*iterations);
212                let raw = pr.compute(&graph);
213                Ok(translate_scores(&graph, &raw))
214            }
215
216            GraphAnalyticsAggregate::BetweennessCentrality { normalized } => {
217                let bc = BetweennessCentrality {
218                    normalized: *normalized,
219                };
220                let raw = bc.compute(&graph);
221                Ok(translate_scores(&graph, &raw))
222            }
223
224            GraphAnalyticsAggregate::ConnectedComponent => {
225                let components = ConnectedComponents::weakly_connected(&graph);
226                let mut result: HashMap<String, f64> = HashMap::new();
227                for (component_id, component) in components.iter().enumerate() {
228                    for &node_id in component {
229                        if let Some(iri) = graph.get_node_iri(node_id) {
230                            result.insert(iri.to_string(), component_id as f64);
231                        }
232                    }
233                }
234                Ok(result)
235            }
236
237            GraphAnalyticsAggregate::ClusteringCoefficient => {
238                Ok(compute_clustering_coefficient(&graph))
239            }
240
241            GraphAnalyticsAggregate::DegreeCentrality { direction } => {
242                let raw = match direction {
243                    DegreeDirection::Incoming => DegreeCentrality::in_degree(&graph),
244                    DegreeDirection::Outgoing => DegreeCentrality::out_degree(&graph),
245                    DegreeDirection::Both => DegreeCentrality::total_degree(&graph),
246                };
247                Ok(translate_scores(&graph, &raw))
248            }
249        }
250    }
251
252    /// Finalize and look up a specific node's score.
253    ///
254    /// Returns `Ok(None)` when the node was not in the accumulated graph rather
255    /// than an error; only hard algorithm failures produce `Err(...)`.
256    pub fn finalize_for_node(&mut self, node: &str) -> Result<Option<f64>, GraphAnalyticsError> {
257        let scores = self.finalize()?;
258        Ok(scores.get(node).copied())
259    }
260}
261
262// ── Helpers ───────────────────────────────────────────────────────────────────
263
264/// Translate a `HashMap<NodeId, f64>` into `HashMap<String (IRI), f64>`.
265fn translate_scores(graph: &RdfGraphAdapter, raw: &HashMap<usize, f64>) -> HashMap<String, f64> {
266    raw.iter()
267        .filter_map(|(&id, &score)| graph.get_node_iri(id).map(|iri| (iri.to_string(), score)))
268        .collect()
269}
270
271/// Compute the local clustering coefficient for every node in `graph`.
272///
273/// The coefficient for node *v* is defined as the fraction of pairs among *v*'s
274/// neighbours (treating the graph as undirected) that are themselves connected:
275///
276/// ```text
277/// C(v) = |{(u, w) ∈ E : u, w ∈ N(v)}| / (k * (k - 1) / 2)
278/// ```
279///
280/// where *k* = |N(v)| and edges are counted undirectedly.  Nodes with fewer
281/// than two neighbours receive a coefficient of `0.0`.
282fn compute_clustering_coefficient(graph: &RdfGraphAdapter) -> HashMap<String, f64> {
283    let n = graph.node_count();
284    let mut result = HashMap::with_capacity(n);
285
286    for v in 0..n {
287        let iri = match graph.get_node_iri(v) {
288            Some(s) => s.to_string(),
289            None => continue,
290        };
291
292        // Build the undirected neighbour set for v
293        // (union of out-neighbours and in-neighbours, excluding v itself).
294        let mut neighbour_set: HashSet<usize> = HashSet::new();
295        for &(u, _) in &graph.adjacency[v] {
296            if u != v {
297                neighbour_set.insert(u);
298            }
299        }
300        for &(u, _) in &graph.reverse_adjacency[v] {
301            if u != v {
302                neighbour_set.insert(u);
303            }
304        }
305
306        let k = neighbour_set.len();
307        if k < 2 {
308            result.insert(iri, 0.0);
309            continue;
310        }
311
312        // Count undirected edges among neighbours.
313        // For each pair (u, w) with u < w in neighbour_set, check if an edge
314        // exists in either direction in the original directed graph.
315        let neighbours: Vec<usize> = neighbour_set.into_iter().collect();
316        let mut triangle_edges: usize = 0;
317
318        for i in 0..neighbours.len() {
319            let u = neighbours[i];
320            // Build a fast lookup for u's out-neighbours
321            let u_out: HashSet<usize> = graph.adjacency[u].iter().map(|&(w, _)| w).collect();
322            let u_in: HashSet<usize> = graph.reverse_adjacency[u].iter().map(|&(w, _)| w).collect();
323
324            for &w in neighbours.iter().skip(i + 1) {
325                // Edge exists (undirected) if u→w or w→u in the directed graph
326                if u_out.contains(&w) || u_in.contains(&w) {
327                    triangle_edges += 1;
328                }
329            }
330        }
331
332        let max_edges = (k * (k - 1)) / 2;
333        let coeff = if max_edges == 0 {
334            0.0
335        } else {
336            triangle_edges as f64 / max_edges as f64
337        };
338        result.insert(iri, coeff);
339    }
340
341    result
342}
343
344// ── Tests ─────────────────────────────────────────────────────────────────────
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349
350    fn make_acc(kind: GraphAnalyticsAggregate) -> GraphAnalyticsAccumulator {
351        GraphAnalyticsAccumulator::new(kind)
352    }
353
354    // ── Accumulator mechanics ─────────────────────────────────────────────
355
356    #[test]
357    fn test_accumulate_tracks_edge_count() {
358        let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
359            direction: DegreeDirection::Both,
360        });
361        acc.accumulate("ex:A", "ex:B");
362        acc.accumulate("ex:B", "ex:C");
363        assert_eq!(acc.edge_count(), 2);
364        assert_eq!(acc.node_count(), 3);
365    }
366
367    #[test]
368    fn test_accumulate_drops_literals() {
369        let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
370            direction: DegreeDirection::Both,
371        });
372        // literal as object
373        acc.accumulate("ex:A", "\"Alice\"");
374        // literal as subject (no colon)
375        acc.accumulate("plainstring", "ex:B");
376        // valid edge
377        acc.accumulate("ex:A", "ex:B");
378        assert_eq!(acc.edge_count(), 1);
379        assert_eq!(acc.node_count(), 2);
380    }
381
382    // ── finalize_for_node ─────────────────────────────────────────────────
383
384    #[test]
385    fn test_finalize_for_node_existing() {
386        let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
387            direction: DegreeDirection::Outgoing,
388        });
389        acc.accumulate("ex:A", "ex:B");
390        acc.accumulate("ex:A", "ex:C");
391        let score = acc
392            .finalize_for_node("ex:A")
393            .expect("finalize should succeed")
394            .expect("ex:A should have a score");
395        // ex:A has out-degree 2, n=3, norm = n-1 = 2 → score = 2/2 = 1.0
396        assert!((score - 1.0).abs() < 1e-9, "score={score}");
397    }
398
399    #[test]
400    fn test_finalize_for_node_missing_returns_none() {
401        let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
402            direction: DegreeDirection::Both,
403        });
404        acc.accumulate("ex:A", "ex:B");
405        let result = acc
406            .finalize_for_node("ex:Z")
407            .expect("finalize should succeed");
408        assert!(result.is_none(), "ex:Z was never accumulated");
409    }
410
411    // ── PageRank ──────────────────────────────────────────────────────────
412
413    #[test]
414    fn test_pagerank_4node_cycle_converges() {
415        let mut acc = make_acc(GraphAnalyticsAggregate::PageRank {
416            damping: 0.85,
417            iterations: 100,
418        });
419        // 4-node cycle: A→B→C→D→A
420        acc.accumulate("ex:A", "ex:B");
421        acc.accumulate("ex:B", "ex:C");
422        acc.accumulate("ex:C", "ex:D");
423        acc.accumulate("ex:D", "ex:A");
424
425        let scores = acc.finalize().expect("PageRank should succeed");
426        assert_eq!(scores.len(), 4, "4 distinct nodes expected");
427
428        // In a symmetric cycle every node should have equal rank (~0.25)
429        let expected = 1.0 / 4.0;
430        for (node, &score) in &scores {
431            assert!(
432                (score - expected).abs() < 1e-4,
433                "node {node}: expected ~{expected}, got {score}"
434            );
435        }
436
437        // Scores should sum to ~1.0
438        let total: f64 = scores.values().sum();
439        assert!((total - 1.0).abs() < 1e-4, "sum={total}");
440    }
441
442    #[test]
443    fn test_pagerank_star_center_highest() {
444        let mut acc = make_acc(GraphAnalyticsAggregate::PageRank {
445            damping: 0.85,
446            iterations: 100,
447        });
448        // Spokes point into hub; hub points back to one spoke (avoids dangling)
449        acc.accumulate("ex:S1", "ex:Hub");
450        acc.accumulate("ex:S2", "ex:Hub");
451        acc.accumulate("ex:S3", "ex:Hub");
452        acc.accumulate("ex:Hub", "ex:S1");
453
454        let scores = acc.finalize().expect("PageRank should succeed");
455        let hub_score = scores["ex:Hub"];
456        let s2_score = scores["ex:S2"];
457        assert!(
458            hub_score > s2_score,
459            "hub ({hub_score}) should outrank spoke ({s2_score})"
460        );
461    }
462
463    #[test]
464    fn test_pagerank_empty_graph_error() {
465        let mut acc = make_acc(GraphAnalyticsAggregate::PageRank {
466            damping: 0.85,
467            iterations: 100,
468        });
469        let err = acc.finalize().expect_err("empty graph should fail");
470        assert!(
471            matches!(err, GraphAnalyticsError::EmptyGraph),
472            "expected EmptyGraph, got {err}"
473        );
474    }
475
476    #[test]
477    fn test_pagerank_damping_affects_rank() {
478        // Build two accumulators with different damping but same graph
479        let edges = [("ex:A", "ex:B"), ("ex:B", "ex:C"), ("ex:C", "ex:A")];
480
481        let mut acc1 = make_acc(GraphAnalyticsAggregate::PageRank {
482            damping: 0.5,
483            iterations: 200,
484        });
485        let mut acc2 = make_acc(GraphAnalyticsAggregate::PageRank {
486            damping: 0.95,
487            iterations: 200,
488        });
489
490        for (s, o) in &edges {
491            acc1.accumulate(s, o);
492            acc2.accumulate(s, o);
493        }
494
495        let s1 = acc1.finalize().expect("should succeed");
496        let s2 = acc2.finalize().expect("should succeed");
497
498        // Both should sum to ~1.0 even with different damping
499        let sum1: f64 = s1.values().sum();
500        let sum2: f64 = s2.values().sum();
501        assert!((sum1 - 1.0).abs() < 1e-4, "d=0.5 sum={sum1}");
502        assert!((sum2 - 1.0).abs() < 1e-4, "d=0.95 sum={sum2}");
503
504        // Scores must differ between damping factors (symmetric ring → equal
505        // scores within each run, but different magnitude between runs due to
506        // different teleportation contributions).  We just confirm they're both
507        // valid by checking sum-to-one rather than ordering.
508        let _ = (sum1, sum2);
509    }
510
511    // ── Betweenness centrality ────────────────────────────────────────────
512
513    #[test]
514    fn test_betweenness_bridge_node_highest() {
515        // A → B → C  linear chain: B is the only bridge
516        let mut acc = make_acc(GraphAnalyticsAggregate::BetweennessCentrality { normalized: true });
517        acc.accumulate("ex:A", "ex:B");
518        acc.accumulate("ex:B", "ex:C");
519
520        let scores = acc.finalize().expect("should succeed");
521        let b_score = scores["ex:B"];
522        let a_score = scores["ex:A"];
523        let c_score = scores["ex:C"];
524
525        assert!(
526            b_score > a_score,
527            "bridge B ({b_score}) should beat A ({a_score})"
528        );
529        assert!(
530            b_score > c_score,
531            "bridge B ({b_score}) should beat C ({c_score})"
532        );
533    }
534
535    #[test]
536    fn test_betweenness_complete_graph_equal() {
537        // K3 directed complete graph: all betweenness equal
538        let mut acc = make_acc(GraphAnalyticsAggregate::BetweennessCentrality { normalized: true });
539        acc.accumulate("ex:A", "ex:B");
540        acc.accumulate("ex:A", "ex:C");
541        acc.accumulate("ex:B", "ex:A");
542        acc.accumulate("ex:B", "ex:C");
543        acc.accumulate("ex:C", "ex:A");
544        acc.accumulate("ex:C", "ex:B");
545
546        let scores = acc.finalize().expect("should succeed");
547        let vals: Vec<f64> = scores.values().copied().collect();
548        let first = vals[0];
549        for &v in &vals {
550            assert!(
551                (v - first).abs() < 1e-9,
552                "complete graph: not equal {v} vs {first}"
553            );
554        }
555    }
556
557    #[test]
558    fn test_betweenness_normalized_in_range() {
559        let mut acc = make_acc(GraphAnalyticsAggregate::BetweennessCentrality { normalized: true });
560        acc.accumulate("ex:A", "ex:B");
561        acc.accumulate("ex:B", "ex:C");
562        acc.accumulate("ex:C", "ex:D");
563        acc.accumulate("ex:D", "ex:E");
564
565        let scores = acc.finalize().expect("should succeed");
566        for (node, &score) in &scores {
567            assert!(
568                (0.0..=1.0).contains(&score),
569                "node {node} score {score} out of [0,1]"
570            );
571        }
572    }
573
574    // ── Connected components ──────────────────────────────────────────────
575
576    #[test]
577    fn test_connected_component_single_component() {
578        let mut acc = make_acc(GraphAnalyticsAggregate::ConnectedComponent);
579        acc.accumulate("ex:A", "ex:B");
580        acc.accumulate("ex:B", "ex:C");
581
582        let scores = acc.finalize().expect("should succeed");
583        // All three nodes belong to component 0
584        for &id in scores.values() {
585            assert_eq!(id as usize, 0, "all nodes in component 0");
586        }
587        assert_eq!(scores.len(), 3);
588    }
589
590    #[test]
591    fn test_connected_component_two_components() {
592        let mut acc = make_acc(GraphAnalyticsAggregate::ConnectedComponent);
593        acc.accumulate("ex:A", "ex:B"); // component 0
594        acc.accumulate("ex:C", "ex:D"); // component 1
595
596        let scores = acc.finalize().expect("should succeed");
597        let ids: HashSet<usize> = scores.values().map(|&v| v as usize).collect();
598        assert_eq!(ids.len(), 2, "exactly 2 distinct component IDs expected");
599    }
600
601    #[test]
602    fn test_connected_component_isolated_node() {
603        // Add a node that appears in one direction and is isolated from another
604        let mut acc = make_acc(GraphAnalyticsAggregate::ConnectedComponent);
605        acc.accumulate("ex:A", "ex:B");
606        // ex:C appears only as a subject with no further connection
607        acc.accumulate("ex:C", "ex:D");
608
609        let scores = acc.finalize().expect("should succeed");
610        // Two separate components: {A,B} and {C,D}
611        let ids: HashSet<usize> = scores.values().map(|&v| v as usize).collect();
612        assert_eq!(ids.len(), 2);
613    }
614
615    // ── Clustering coefficient ────────────────────────────────────────────
616
617    #[test]
618    fn test_clustering_complete_graph_is_one() {
619        // Triangle K3 (undirected via directed edges): every pair is connected
620        let mut acc = make_acc(GraphAnalyticsAggregate::ClusteringCoefficient);
621        acc.accumulate("ex:A", "ex:B");
622        acc.accumulate("ex:B", "ex:C");
623        acc.accumulate("ex:A", "ex:C");
624
625        let scores = acc.finalize().expect("should succeed");
626        for (node, &coeff) in &scores {
627            assert!(
628                (coeff - 1.0).abs() < 1e-9,
629                "K3 node {node}: expected 1.0, got {coeff}"
630            );
631        }
632    }
633
634    #[test]
635    fn test_clustering_star_is_zero() {
636        // Star graph: center → 3 spokes.  Spokes are not connected to each other.
637        let mut acc = make_acc(GraphAnalyticsAggregate::ClusteringCoefficient);
638        acc.accumulate("ex:Hub", "ex:S1");
639        acc.accumulate("ex:Hub", "ex:S2");
640        acc.accumulate("ex:Hub", "ex:S3");
641
642        let scores = acc.finalize().expect("should succeed");
643        // S1, S2, S3 each have exactly 1 neighbour → coefficient = 0
644        for spoke in &["ex:S1", "ex:S2", "ex:S3"] {
645            let coeff = scores[*spoke];
646            assert!(coeff < 1e-9, "star leaf {spoke}: expected 0.0, got {coeff}");
647        }
648    }
649
650    // ── Degree centrality ─────────────────────────────────────────────────
651
652    #[test]
653    fn test_degree_outgoing_count() {
654        // ex:A has 2 out-edges; ex:B has 1; ex:C has 0.
655        let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
656            direction: DegreeDirection::Outgoing,
657        });
658        acc.accumulate("ex:A", "ex:B");
659        acc.accumulate("ex:A", "ex:C");
660        acc.accumulate("ex:B", "ex:C");
661
662        let scores = acc.finalize().expect("should succeed");
663        let a = scores["ex:A"];
664        let b = scores["ex:B"];
665        let c = scores["ex:C"];
666        // n=3, norm=2; A:2/2=1.0, B:1/2=0.5, C:0/2=0.0
667        assert!((a - 1.0).abs() < 1e-9, "A out={a}");
668        assert!((b - 0.5).abs() < 1e-9, "B out={b}");
669        assert!((c - 0.0).abs() < 1e-9, "C out={c}");
670    }
671
672    #[test]
673    fn test_degree_incoming_count() {
674        // ex:C receives 2 in-edges; ex:B receives 1; ex:A receives 0.
675        let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
676            direction: DegreeDirection::Incoming,
677        });
678        acc.accumulate("ex:A", "ex:C");
679        acc.accumulate("ex:B", "ex:C");
680        acc.accumulate("ex:A", "ex:B");
681
682        let scores = acc.finalize().expect("should succeed");
683        let c_score = scores["ex:C"];
684        let a_score = scores["ex:A"];
685        assert!(
686            c_score > a_score,
687            "C in={c_score} should exceed A in={a_score}"
688        );
689    }
690
691    #[test]
692    fn test_degree_both_sum() {
693        // ex:A→B, B→C: A has out=1 in=0; B has out=1 in=1; C has out=0 in=1.
694        let mut acc = make_acc(GraphAnalyticsAggregate::DegreeCentrality {
695            direction: DegreeDirection::Both,
696        });
697        acc.accumulate("ex:A", "ex:B");
698        acc.accumulate("ex:B", "ex:C");
699
700        let scores = acc.finalize().expect("should succeed");
701        let b = scores["ex:B"];
702        let a = scores["ex:A"];
703        let c = scores["ex:C"];
704        // B has in+out=2 (highest), A and C have 1 each
705        assert!(b > a, "B total={b} should exceed A total={a}");
706        assert!(b > c, "B total={b} should exceed C total={c}");
707        assert!((a - c).abs() < 1e-9, "A and C should be equal: a={a} c={c}");
708    }
709}