1use std::collections::VecDeque;
7use std::sync::OnceLock;
8
9use graphos_common::types::{NodeId, Value};
10use graphos_common::utils::error::Result;
11use graphos_common::utils::hash::FxHashMap;
12use graphos_core::graph::Direction;
13use graphos_core::graph::lpg::LpgStore;
14
15use super::super::{AlgorithmResult, ParameterDef, ParameterType, Parameters};
16use super::traits::{GraphAlgorithm, NodeValueResultBuilder};
17
18#[derive(Debug, Clone)]
24pub struct DegreeCentralityResult {
25 pub in_degree: FxHashMap<NodeId, usize>,
27 pub out_degree: FxHashMap<NodeId, usize>,
29 pub total_degree: FxHashMap<NodeId, usize>,
31}
32
33pub fn degree_centrality(store: &LpgStore) -> DegreeCentralityResult {
50 let mut in_degree: FxHashMap<NodeId, usize> = FxHashMap::default();
51 let mut out_degree: FxHashMap<NodeId, usize> = FxHashMap::default();
52
53 let nodes = store.node_ids();
54
55 for &node in &nodes {
57 in_degree.insert(node, 0);
58 out_degree.insert(node, 0);
59 }
60
61 for &node in &nodes {
63 let out_count = store.edges_from(node, Direction::Outgoing).count();
64 out_degree.insert(node, out_count);
65
66 for (neighbor, _) in store.edges_from(node, Direction::Outgoing) {
68 *in_degree.entry(neighbor).or_insert(0) += 1;
69 }
70 }
71
72 let total_degree: FxHashMap<NodeId, usize> = nodes
74 .iter()
75 .map(|&n| {
76 let in_d = *in_degree.get(&n).unwrap_or(&0);
77 let out_d = *out_degree.get(&n).unwrap_or(&0);
78 (n, in_d + out_d)
79 })
80 .collect();
81
82 DegreeCentralityResult {
83 in_degree,
84 out_degree,
85 total_degree,
86 }
87}
88
89pub fn degree_centrality_normalized(store: &LpgStore) -> FxHashMap<NodeId, f64> {
93 let result = degree_centrality(store);
94 let n = result.total_degree.len();
95
96 if n <= 1 {
97 return result
98 .total_degree
99 .into_iter()
100 .map(|(k, _)| (k, 0.0))
101 .collect();
102 }
103
104 let norm = (n - 1) as f64;
105 result
106 .total_degree
107 .into_iter()
108 .map(|(k, v)| (k, v as f64 / norm))
109 .collect()
110}
111
112pub fn pagerank(
136 store: &LpgStore,
137 damping: f64,
138 max_iterations: usize,
139 tolerance: f64,
140) -> FxHashMap<NodeId, f64> {
141 let nodes = store.node_ids();
142 let n = nodes.len();
143
144 if n == 0 {
145 return FxHashMap::default();
146 }
147
148 let mut node_to_idx: FxHashMap<NodeId, usize> = FxHashMap::default();
150 for (idx, &node) in nodes.iter().enumerate() {
151 node_to_idx.insert(node, idx);
152 }
153
154 let mut out_edges: Vec<Vec<usize>> = vec![Vec::new(); n];
156 let mut out_degree: Vec<usize> = vec![0; n];
157
158 for (idx, &node) in nodes.iter().enumerate() {
159 let edges: Vec<usize> = store
160 .edges_from(node, Direction::Outgoing)
161 .filter_map(|(neighbor, _)| node_to_idx.get(&neighbor).copied())
162 .collect();
163 out_degree[idx] = edges.len();
164 out_edges[idx] = edges;
165 }
166
167 let initial_score = 1.0 / n as f64;
169 let mut scores = vec![initial_score; n];
170 let mut new_scores = vec![0.0; n];
171
172 let dangling: Vec<usize> = (0..n).filter(|&i| out_degree[i] == 0).collect();
174
175 for _ in 0..max_iterations {
177 let dangling_sum: f64 = dangling.iter().map(|&i| scores[i]).sum();
179 let dangling_contrib = damping * dangling_sum / n as f64;
180
181 let teleport = (1.0 - damping) / n as f64;
183 for score in &mut new_scores {
184 *score = teleport + dangling_contrib;
185 }
186
187 for (i, edges) in out_edges.iter().enumerate() {
189 if !edges.is_empty() {
190 let contrib = damping * scores[i] / edges.len() as f64;
191 for &j in edges {
192 new_scores[j] += contrib;
193 }
194 }
195 }
196
197 let max_diff: f64 = scores
199 .iter()
200 .zip(new_scores.iter())
201 .map(|(old, new)| (old - new).abs())
202 .fold(0.0, f64::max);
203
204 std::mem::swap(&mut scores, &mut new_scores);
205
206 if max_diff < tolerance {
207 break;
208 }
209 }
210
211 nodes
213 .into_iter()
214 .enumerate()
215 .map(|(idx, node)| (node, scores[idx]))
216 .collect()
217}
218
219pub fn betweenness_centrality(store: &LpgStore, normalized: bool) -> FxHashMap<NodeId, f64> {
241 let nodes = store.node_ids();
242 let n = nodes.len();
243
244 let mut centrality: FxHashMap<NodeId, f64> = FxHashMap::default();
245 for &node in &nodes {
246 centrality.insert(node, 0.0);
247 }
248
249 if n <= 2 {
250 return centrality;
251 }
252
253 for &source in &nodes {
255 let mut stack: Vec<NodeId> = Vec::new();
257 let mut predecessors: FxHashMap<NodeId, Vec<NodeId>> = FxHashMap::default();
258 let mut sigma: FxHashMap<NodeId, f64> = FxHashMap::default(); let mut dist: FxHashMap<NodeId, i64> = FxHashMap::default();
260
261 for &node in &nodes {
263 predecessors.insert(node, Vec::new());
264 sigma.insert(node, 0.0);
265 dist.insert(node, -1);
266 }
267 sigma.insert(source, 1.0);
268 dist.insert(source, 0);
269
270 let mut queue: VecDeque<NodeId> = VecDeque::new();
272 queue.push_back(source);
273
274 while let Some(v) = queue.pop_front() {
275 stack.push(v);
276 let dist_v = *dist.get(&v).unwrap();
277
278 for (w, _) in store.edges_from(v, Direction::Outgoing) {
279 if *dist.get(&w).unwrap() < 0 {
281 dist.insert(w, dist_v + 1);
282 queue.push_back(w);
283 }
284
285 if *dist.get(&w).unwrap() == dist_v + 1 {
287 let sigma_v = *sigma.get(&v).unwrap();
288 *sigma.entry(w).or_insert(0.0) += sigma_v;
289 predecessors.entry(w).or_default().push(v);
290 }
291 }
292 }
293
294 let mut delta: FxHashMap<NodeId, f64> = FxHashMap::default();
296 for &node in &nodes {
297 delta.insert(node, 0.0);
298 }
299
300 while let Some(w) = stack.pop() {
301 if w == source {
302 continue;
303 }
304
305 let sigma_w = *sigma.get(&w).unwrap();
306 let delta_w = *delta.get(&w).unwrap();
307
308 for v in predecessors.get(&w).unwrap_or(&Vec::new()) {
309 let sigma_v = *sigma.get(v).unwrap();
310 let coeff = (sigma_v / sigma_w) * (1.0 + delta_w);
311 *delta.entry(*v).or_insert(0.0) += coeff;
312 }
313
314 *centrality.entry(w).or_insert(0.0) += delta_w;
315 }
316 }
317
318 if normalized && n > 2 {
320 let norm = 2.0 / ((n - 1) * (n - 2)) as f64;
321 for (_, v) in centrality.iter_mut() {
322 *v *= norm;
323 }
324 }
325
326 centrality
327}
328
329pub fn closeness_centrality(store: &LpgStore, wf_improved: bool) -> FxHashMap<NodeId, f64> {
351 let nodes = store.node_ids();
352 let n = nodes.len();
353
354 let mut centrality: FxHashMap<NodeId, f64> = FxHashMap::default();
355
356 if n <= 1 {
357 for &node in &nodes {
358 centrality.insert(node, 0.0);
359 }
360 return centrality;
361 }
362
363 for &source in &nodes {
364 let mut dist: FxHashMap<NodeId, usize> = FxHashMap::default();
366 let mut queue: VecDeque<NodeId> = VecDeque::new();
367
368 dist.insert(source, 0);
369 queue.push_back(source);
370
371 while let Some(v) = queue.pop_front() {
372 let dist_v = *dist.get(&v).unwrap();
373
374 for (w, _) in store.edges_from(v, Direction::Outgoing) {
375 if !dist.contains_key(&w) {
376 dist.insert(w, dist_v + 1);
377 queue.push_back(w);
378 }
379 }
380 }
381
382 let reachable = dist.len() - 1; let total_dist: usize = dist.values().sum();
385
386 let closeness = if total_dist > 0 && reachable > 0 {
387 if wf_improved {
388 let reachable_f = reachable as f64;
390 let n_minus_1 = (n - 1) as f64;
391 (reachable_f / n_minus_1) * (reachable_f / total_dist as f64)
392 } else {
393 reachable as f64 / total_dist as f64
395 }
396 } else {
397 0.0
398 };
399
400 centrality.insert(source, closeness);
401 }
402
403 centrality
404}
405
406static PAGERANK_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
412
413fn pagerank_params() -> &'static [ParameterDef] {
414 PAGERANK_PARAMS.get_or_init(|| {
415 vec![
416 ParameterDef {
417 name: "damping".to_string(),
418 description: "Damping factor (default: 0.85)".to_string(),
419 param_type: ParameterType::Float,
420 required: false,
421 default: Some("0.85".to_string()),
422 },
423 ParameterDef {
424 name: "max_iterations".to_string(),
425 description: "Maximum iterations (default: 100)".to_string(),
426 param_type: ParameterType::Integer,
427 required: false,
428 default: Some("100".to_string()),
429 },
430 ParameterDef {
431 name: "tolerance".to_string(),
432 description: "Convergence tolerance (default: 1e-6)".to_string(),
433 param_type: ParameterType::Float,
434 required: false,
435 default: Some("1e-6".to_string()),
436 },
437 ]
438 })
439}
440
441pub struct PageRankAlgorithm;
443
444impl GraphAlgorithm for PageRankAlgorithm {
445 fn name(&self) -> &str {
446 "pagerank"
447 }
448
449 fn description(&self) -> &str {
450 "PageRank algorithm for measuring node importance"
451 }
452
453 fn parameters(&self) -> &[ParameterDef] {
454 pagerank_params()
455 }
456
457 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
458 let damping = params.get_float("damping").unwrap_or(0.85);
459 let max_iter = params.get_int("max_iterations").unwrap_or(100) as usize;
460 let tolerance = params.get_float("tolerance").unwrap_or(1e-6);
461
462 let scores = pagerank(store, damping, max_iter, tolerance);
463
464 let mut builder = NodeValueResultBuilder::with_capacity("pagerank", scores.len());
465 for (node, score) in scores {
466 builder.push(node, Value::Float64(score));
467 }
468
469 Ok(builder.build())
470 }
471}
472
473static BETWEENNESS_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
475
476fn betweenness_params() -> &'static [ParameterDef] {
477 BETWEENNESS_PARAMS.get_or_init(|| {
478 vec![ParameterDef {
479 name: "normalized".to_string(),
480 description: "Normalize scores (default: true)".to_string(),
481 param_type: ParameterType::Boolean,
482 required: false,
483 default: Some("true".to_string()),
484 }]
485 })
486}
487
488pub struct BetweennessCentralityAlgorithm;
490
491impl GraphAlgorithm for BetweennessCentralityAlgorithm {
492 fn name(&self) -> &str {
493 "betweenness_centrality"
494 }
495
496 fn description(&self) -> &str {
497 "Betweenness centrality using Brandes' algorithm"
498 }
499
500 fn parameters(&self) -> &[ParameterDef] {
501 betweenness_params()
502 }
503
504 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
505 let normalized = params.get_bool("normalized").unwrap_or(true);
506
507 let scores = betweenness_centrality(store, normalized);
508
509 let mut builder = NodeValueResultBuilder::with_capacity("betweenness", scores.len());
510 for (node, score) in scores {
511 builder.push(node, Value::Float64(score));
512 }
513
514 Ok(builder.build())
515 }
516}
517
518static CLOSENESS_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
520
521fn closeness_params() -> &'static [ParameterDef] {
522 CLOSENESS_PARAMS.get_or_init(|| {
523 vec![ParameterDef {
524 name: "wf_improved".to_string(),
525 description: "Use Wasserman-Faust formula for disconnected graphs (default: false)"
526 .to_string(),
527 param_type: ParameterType::Boolean,
528 required: false,
529 default: Some("false".to_string()),
530 }]
531 })
532}
533
534pub struct ClosenessCentralityAlgorithm;
536
537impl GraphAlgorithm for ClosenessCentralityAlgorithm {
538 fn name(&self) -> &str {
539 "closeness_centrality"
540 }
541
542 fn description(&self) -> &str {
543 "Closeness centrality based on shortest path distances"
544 }
545
546 fn parameters(&self) -> &[ParameterDef] {
547 closeness_params()
548 }
549
550 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
551 let wf_improved = params.get_bool("wf_improved").unwrap_or(false);
552
553 let scores = closeness_centrality(store, wf_improved);
554
555 let mut builder = NodeValueResultBuilder::with_capacity("closeness", scores.len());
556 for (node, score) in scores {
557 builder.push(node, Value::Float64(score));
558 }
559
560 Ok(builder.build())
561 }
562}
563
564static DEGREE_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
566
567fn degree_params() -> &'static [ParameterDef] {
568 DEGREE_PARAMS.get_or_init(|| {
569 vec![ParameterDef {
570 name: "normalized".to_string(),
571 description: "Normalize by (n-1) (default: false)".to_string(),
572 param_type: ParameterType::Boolean,
573 required: false,
574 default: Some("false".to_string()),
575 }]
576 })
577}
578
579pub struct DegreeCentralityAlgorithm;
581
582impl GraphAlgorithm for DegreeCentralityAlgorithm {
583 fn name(&self) -> &str {
584 "degree_centrality"
585 }
586
587 fn description(&self) -> &str {
588 "Degree centrality (node connectivity measure)"
589 }
590
591 fn parameters(&self) -> &[ParameterDef] {
592 degree_params()
593 }
594
595 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
596 let normalized = params.get_bool("normalized").unwrap_or(false);
597
598 if normalized {
599 let scores = degree_centrality_normalized(store);
600
601 let mut builder =
602 NodeValueResultBuilder::with_capacity("degree_centrality", scores.len());
603 for (node, score) in scores {
604 builder.push(node, Value::Float64(score));
605 }
606 Ok(builder.build())
607 } else {
608 let result = degree_centrality(store);
609
610 let mut output = AlgorithmResult::new(vec![
611 "node_id".to_string(),
612 "in_degree".to_string(),
613 "out_degree".to_string(),
614 "total_degree".to_string(),
615 ]);
616
617 for (&node, &total) in &result.total_degree {
618 let in_d = *result.in_degree.get(&node).unwrap_or(&0);
619 let out_d = *result.out_degree.get(&node).unwrap_or(&0);
620
621 output.add_row(vec![
622 Value::Int64(node.0 as i64),
623 Value::Int64(in_d as i64),
624 Value::Int64(out_d as i64),
625 Value::Int64(total as i64),
626 ]);
627 }
628
629 Ok(output)
630 }
631 }
632}
633
634#[cfg(test)]
639mod tests {
640 use super::*;
641
642 fn create_test_graph() -> LpgStore {
643 let store = LpgStore::new();
644
645 let n0 = store.create_node(&["Node"]);
651 let n1 = store.create_node(&["Node"]);
652 let n2 = store.create_node(&["Node"]);
653 let n3 = store.create_node(&["Node"]);
654 let n4 = store.create_node(&["Node"]);
655
656 store.create_edge(n0, n1, "EDGE");
657 store.create_edge(n0, n3, "EDGE");
658 store.create_edge(n1, n2, "EDGE");
659 store.create_edge(n1, n4, "EDGE");
660 store.create_edge(n3, n4, "EDGE");
661
662 store
663 }
664
665 fn create_pagerank_graph() -> LpgStore {
666 let store = LpgStore::new();
670
671 let a = store.create_node(&["Node"]);
672 let b = store.create_node(&["Node"]);
673 let c = store.create_node(&["Node"]);
674
675 store.create_edge(a, b, "LINK");
676 store.create_edge(b, c, "LINK");
677 store.create_edge(a, c, "LINK");
678
679 store
680 }
681
682 #[test]
683 fn test_degree_centrality() {
684 let store = create_test_graph();
685 let result = degree_centrality(&store);
686
687 assert_eq!(*result.out_degree.get(&NodeId::new(0)).unwrap(), 2);
689
690 assert_eq!(*result.out_degree.get(&NodeId::new(4)).unwrap(), 0);
692 assert_eq!(*result.in_degree.get(&NodeId::new(4)).unwrap(), 2);
693 }
694
695 #[test]
696 fn test_degree_centrality_normalized() {
697 let store = create_test_graph();
698 let result = degree_centrality_normalized(&store);
699
700 for (_, &score) in &result {
702 assert!(score >= 0.0 && score <= 1.0);
703 }
704 }
705
706 #[test]
707 fn test_pagerank_basic() {
708 let store = create_pagerank_graph();
709 let scores = pagerank(&store, 0.85, 100, 1e-6);
710
711 assert_eq!(scores.len(), 3);
712
713 for (_, &score) in &scores {
715 assert!(score > 0.0);
716 }
717
718 let total: f64 = scores.values().sum();
720 assert!((total - 1.0).abs() < 0.01);
721 }
722
723 #[test]
724 fn test_pagerank_dangling() {
725 let store = LpgStore::new();
727 let a = store.create_node(&["Node"]);
728 let b = store.create_node(&["Node"]);
729 store.create_edge(a, b, "EDGE");
730 let scores = pagerank(&store, 0.85, 100, 1e-6);
733 assert_eq!(scores.len(), 2);
734
735 assert!(*scores.get(&b).unwrap() > 0.0);
737 }
738
739 #[test]
740 fn test_pagerank_empty() {
741 let store = LpgStore::new();
742 let scores = pagerank(&store, 0.85, 100, 1e-6);
743 assert!(scores.is_empty());
744 }
745
746 #[test]
747 fn test_betweenness_centrality() {
748 let store = create_test_graph();
749 let scores = betweenness_centrality(&store, false);
750
751 assert_eq!(scores.len(), 5);
752
753 for (_, &score) in &scores {
755 assert!(score >= 0.0);
756 }
757 }
758
759 #[test]
760 fn test_betweenness_centrality_normalized() {
761 let store = create_test_graph();
762 let scores = betweenness_centrality(&store, true);
763
764 for (_, &score) in &scores {
766 assert!(score >= 0.0);
767 }
768 }
769
770 #[test]
771 fn test_closeness_centrality() {
772 let store = create_test_graph();
773 let scores = closeness_centrality(&store, false);
774
775 assert_eq!(scores.len(), 5);
776
777 for (_, &score) in &scores {
779 assert!(score >= 0.0);
780 }
781
782 assert!(*scores.get(&NodeId::new(0)).unwrap() > 0.0);
784 }
785
786 #[test]
787 fn test_closeness_wf_improved() {
788 let store = create_test_graph();
789 let scores_standard = closeness_centrality(&store, false);
790 let scores_wf = closeness_centrality(&store, true);
791
792 for (node, &wf_score) in &scores_wf {
794 assert!(wf_score >= 0.0);
795 let _std_score = scores_standard.get(node).unwrap();
797 }
798 }
799
800 #[test]
801 fn test_closeness_disconnected() {
802 let store = LpgStore::new();
804 let a = store.create_node(&["Node"]);
805 let _b = store.create_node(&["Node"]); let scores = closeness_centrality(&store, false);
808
809 assert_eq!(*scores.get(&a).unwrap(), 0.0);
811 }
812
813 #[test]
814 fn test_single_node() {
815 let store = LpgStore::new();
816 store.create_node(&["Node"]);
817
818 let degree = degree_centrality(&store);
819 assert_eq!(degree.total_degree.len(), 1);
820
821 let pr = pagerank(&store, 0.85, 100, 1e-6);
822 assert_eq!(pr.len(), 1);
823
824 let bc = betweenness_centrality(&store, false);
825 assert_eq!(bc.len(), 1);
826
827 let cc = closeness_centrality(&store, false);
828 assert_eq!(cc.len(), 1);
829 }
830}