1#[cfg(feature = "parallel")]
7use std::sync::Arc;
8use std::sync::OnceLock;
9
10use grafeo_common::types::{NodeId, Value};
11use grafeo_common::utils::error::Result;
12use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::LpgStore;
15#[cfg(feature = "parallel")]
16use rayon::prelude::*;
17
18use super::super::{AlgorithmResult, ParameterDef, ParameterType, Parameters};
19use super::traits::GraphAlgorithm;
20#[cfg(feature = "parallel")]
21use super::traits::ParallelGraphAlgorithm;
22
23#[derive(Debug, Clone)]
29pub struct ClusteringCoefficientResult {
30 pub coefficients: FxHashMap<NodeId, f64>,
32 pub triangle_counts: FxHashMap<NodeId, u64>,
34 pub total_triangles: u64,
36 pub global_coefficient: f64,
38}
39
40fn build_undirected_neighbors(store: &LpgStore) -> FxHashMap<NodeId, FxHashSet<NodeId>> {
48 let nodes = store.node_ids();
49 let mut neighbors: FxHashMap<NodeId, FxHashSet<NodeId>> = FxHashMap::default();
50
51 for &node in &nodes {
53 neighbors.insert(node, FxHashSet::default());
54 }
55
56 for &node in &nodes {
58 for (neighbor, _) in store.edges_from(node, Direction::Outgoing) {
60 if let Some(set) = neighbors.get_mut(&node) {
61 set.insert(neighbor);
62 }
63 if let Some(set) = neighbors.get_mut(&neighbor) {
65 set.insert(node);
66 }
67 }
68
69 for (neighbor, _) in store.edges_from(node, Direction::Incoming) {
71 if let Some(set) = neighbors.get_mut(&node) {
72 set.insert(neighbor);
73 }
74 if let Some(set) = neighbors.get_mut(&neighbor) {
75 set.insert(node);
76 }
77 }
78 }
79
80 neighbors
81}
82
83fn count_node_triangles(
85 node_neighbors: &FxHashSet<NodeId>,
86 all_neighbors: &FxHashMap<NodeId, FxHashSet<NodeId>>,
87) -> u64 {
88 let neighbor_list: Vec<NodeId> = node_neighbors.iter().copied().collect();
89 let k = neighbor_list.len();
90 let mut triangles = 0u64;
91
92 for i in 0..k {
94 for j in (i + 1)..k {
95 let u = neighbor_list[i];
96 let w = neighbor_list[j];
97
98 if let Some(u_neighbors) = all_neighbors.get(&u)
100 && u_neighbors.contains(&w)
101 {
102 triangles += 1;
103 }
104 }
105 }
106
107 triangles
108}
109
110pub fn triangle_count(store: &LpgStore) -> FxHashMap<NodeId, u64> {
131 let neighbors = build_undirected_neighbors(store);
132 let mut counts: FxHashMap<NodeId, u64> = FxHashMap::default();
133
134 for (&node, node_neighbors) in &neighbors {
135 let triangles = count_node_triangles(node_neighbors, &neighbors);
136 counts.insert(node, triangles);
137 }
138
139 counts
140}
141
142pub fn local_clustering_coefficient(store: &LpgStore) -> FxHashMap<NodeId, f64> {
163 let neighbors = build_undirected_neighbors(store);
164 let mut coefficients: FxHashMap<NodeId, f64> = FxHashMap::default();
165
166 for (&node, node_neighbors) in &neighbors {
167 let k = node_neighbors.len();
168
169 if k < 2 {
170 coefficients.insert(node, 0.0);
172 } else {
173 let triangles = count_node_triangles(node_neighbors, &neighbors);
174 let max_triangles = (k * (k - 1)) / 2;
175 let coefficient = triangles as f64 / max_triangles as f64;
176 coefficients.insert(node, coefficient);
177 }
178 }
179
180 coefficients
181}
182
183pub fn global_clustering_coefficient(store: &LpgStore) -> f64 {
200 let local = local_clustering_coefficient(store);
201
202 if local.is_empty() {
203 return 0.0;
204 }
205
206 let sum: f64 = local.values().sum();
207 sum / local.len() as f64
208}
209
210pub fn total_triangles(store: &LpgStore) -> u64 {
226 let per_node = triangle_count(store);
227 per_node.values().sum::<u64>() / 3
229}
230
231pub fn clustering_coefficient(store: &LpgStore) -> ClusteringCoefficientResult {
249 let neighbors = build_undirected_neighbors(store);
250 let n = neighbors.len();
251
252 let mut coefficients: FxHashMap<NodeId, f64> = FxHashMap::default();
253 let mut triangle_counts: FxHashMap<NodeId, u64> = FxHashMap::default();
254
255 for (&node, node_neighbors) in &neighbors {
256 let k = node_neighbors.len();
257 let triangles = count_node_triangles(node_neighbors, &neighbors);
258
259 triangle_counts.insert(node, triangles);
260
261 let coefficient = if k < 2 {
262 0.0
263 } else {
264 let max_triangles = (k * (k - 1)) / 2;
265 triangles as f64 / max_triangles as f64
266 };
267 coefficients.insert(node, coefficient);
268 }
269
270 let total_triangles = triangle_counts.values().sum::<u64>() / 3;
271 let global_coefficient = if n == 0 {
272 0.0
273 } else {
274 coefficients.values().sum::<f64>() / n as f64
275 };
276
277 ClusteringCoefficientResult {
278 coefficients,
279 triangle_counts,
280 total_triangles,
281 global_coefficient,
282 }
283}
284
285#[cfg(feature = "parallel")]
307pub fn clustering_coefficient_parallel(
308 store: &LpgStore,
309 parallel_threshold: usize,
310) -> ClusteringCoefficientResult {
311 let neighbors = build_undirected_neighbors(store);
312 let n = neighbors.len();
313
314 if n < parallel_threshold {
315 return clustering_coefficient(store);
317 }
318
319 let neighbors = Arc::new(neighbors);
321 let nodes: Vec<NodeId> = neighbors.keys().copied().collect();
322
323 let (coefficients, triangle_counts): (FxHashMap<NodeId, f64>, FxHashMap<NodeId, u64>) = nodes
325 .par_iter()
326 .fold(
327 || (FxHashMap::default(), FxHashMap::default()),
328 |(mut coeffs, mut triangles), &node| {
329 let node_neighbors = neighbors.get(&node).unwrap();
330 let k = node_neighbors.len();
331
332 let t = count_node_triangles(node_neighbors, &neighbors);
333
334 triangles.insert(node, t);
335
336 let coefficient = if k < 2 {
337 0.0
338 } else {
339 let max_triangles = (k * (k - 1)) / 2;
340 t as f64 / max_triangles as f64
341 };
342 coeffs.insert(node, coefficient);
343
344 (coeffs, triangles)
345 },
346 )
347 .reduce(
348 || (FxHashMap::default(), FxHashMap::default()),
349 |(mut c1, mut t1), (c2, t2)| {
350 c1.extend(c2);
351 t1.extend(t2);
352 (c1, t1)
353 },
354 );
355
356 let total_triangles = triangle_counts.values().sum::<u64>() / 3;
357 let global_coefficient = if n == 0 {
358 0.0
359 } else {
360 coefficients.values().sum::<f64>() / n as f64
361 };
362
363 ClusteringCoefficientResult {
364 coefficients,
365 triangle_counts,
366 total_triangles,
367 global_coefficient,
368 }
369}
370
371static CLUSTERING_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
377
378fn clustering_params() -> &'static [ParameterDef] {
379 CLUSTERING_PARAMS.get_or_init(|| {
380 vec![
381 ParameterDef {
382 name: "parallel".to_string(),
383 description: "Enable parallel computation (default: true)".to_string(),
384 param_type: ParameterType::Boolean,
385 required: false,
386 default: Some("true".to_string()),
387 },
388 ParameterDef {
389 name: "parallel_threshold".to_string(),
390 description: "Minimum nodes for parallel execution (default: 50)".to_string(),
391 param_type: ParameterType::Integer,
392 required: false,
393 default: Some("50".to_string()),
394 },
395 ]
396 })
397}
398
399pub struct ClusteringCoefficientAlgorithm;
401
402impl GraphAlgorithm for ClusteringCoefficientAlgorithm {
403 fn name(&self) -> &str {
404 "clustering_coefficient"
405 }
406
407 fn description(&self) -> &str {
408 "Local and global clustering coefficients with triangle counts"
409 }
410
411 fn parameters(&self) -> &[ParameterDef] {
412 clustering_params()
413 }
414
415 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
416 #[cfg(feature = "parallel")]
417 let result = {
418 let parallel = params.get_bool("parallel").unwrap_or(true);
419 let threshold = params.get_int("parallel_threshold").unwrap_or(50) as usize;
420
421 if parallel {
422 clustering_coefficient_parallel(store, threshold)
423 } else {
424 clustering_coefficient(store)
425 }
426 };
427
428 #[cfg(not(feature = "parallel"))]
429 let result = {
430 let _ = params; clustering_coefficient(store)
432 };
433
434 let mut output = AlgorithmResult::new(vec![
435 "node_id".to_string(),
436 "clustering_coefficient".to_string(),
437 "triangle_count".to_string(),
438 ]);
439
440 for (node, coefficient) in result.coefficients {
441 let triangles = *result.triangle_counts.get(&node).unwrap_or(&0);
442 output.add_row(vec![
443 Value::Int64(node.0 as i64),
444 Value::Float64(coefficient),
445 Value::Int64(triangles as i64),
446 ]);
447 }
448
449 Ok(output)
450 }
451}
452
453#[cfg(feature = "parallel")]
454impl ParallelGraphAlgorithm for ClusteringCoefficientAlgorithm {
455 fn parallel_threshold(&self) -> usize {
456 50
457 }
458
459 fn execute_parallel(
460 &self,
461 store: &LpgStore,
462 _params: &Parameters,
463 _num_threads: usize,
464 ) -> Result<AlgorithmResult> {
465 let result = clustering_coefficient_parallel(store, self.parallel_threshold());
466
467 let mut output = AlgorithmResult::new(vec![
468 "node_id".to_string(),
469 "clustering_coefficient".to_string(),
470 "triangle_count".to_string(),
471 ]);
472
473 for (node, coefficient) in result.coefficients {
474 let triangles = *result.triangle_counts.get(&node).unwrap_or(&0);
475 output.add_row(vec![
476 Value::Int64(node.0 as i64),
477 Value::Float64(coefficient),
478 Value::Int64(triangles as i64),
479 ]);
480 }
481
482 Ok(output)
483 }
484}
485
486#[cfg(test)]
491mod tests {
492 use super::*;
493
494 fn create_triangle_graph() -> LpgStore {
495 let store = LpgStore::new();
497 let n0 = store.create_node(&["Node"]);
498 let n1 = store.create_node(&["Node"]);
499 let n2 = store.create_node(&["Node"]);
500
501 store.create_edge(n0, n1, "EDGE");
503 store.create_edge(n1, n0, "EDGE");
504 store.create_edge(n1, n2, "EDGE");
505 store.create_edge(n2, n1, "EDGE");
506 store.create_edge(n2, n0, "EDGE");
507 store.create_edge(n0, n2, "EDGE");
508
509 store
510 }
511
512 fn create_star_graph() -> LpgStore {
513 let store = LpgStore::new();
516 let center = store.create_node(&["Center"]);
517
518 for _ in 0..4 {
519 let leaf = store.create_node(&["Leaf"]);
520 store.create_edge(center, leaf, "EDGE");
521 store.create_edge(leaf, center, "EDGE");
522 }
523
524 store
525 }
526
527 fn create_complete_graph(n: usize) -> LpgStore {
528 let store = LpgStore::new();
530 let nodes: Vec<NodeId> = (0..n).map(|_| store.create_node(&["Node"])).collect();
531
532 for i in 0..n {
533 for j in (i + 1)..n {
534 store.create_edge(nodes[i], nodes[j], "EDGE");
535 store.create_edge(nodes[j], nodes[i], "EDGE");
536 }
537 }
538
539 store
540 }
541
542 fn create_path_graph() -> LpgStore {
543 let store = LpgStore::new();
545 let n0 = store.create_node(&["Node"]);
546 let n1 = store.create_node(&["Node"]);
547 let n2 = store.create_node(&["Node"]);
548 let n3 = store.create_node(&["Node"]);
549
550 store.create_edge(n0, n1, "EDGE");
551 store.create_edge(n1, n0, "EDGE");
552 store.create_edge(n1, n2, "EDGE");
553 store.create_edge(n2, n1, "EDGE");
554 store.create_edge(n2, n3, "EDGE");
555 store.create_edge(n3, n2, "EDGE");
556
557 store
558 }
559
560 #[test]
561 fn test_triangle_graph_clustering() {
562 let store = create_triangle_graph();
563 let result = clustering_coefficient(&store);
564
565 for (_, coeff) in &result.coefficients {
567 assert!(
568 (*coeff - 1.0).abs() < 1e-10,
569 "Expected coefficient 1.0, got {}",
570 coeff
571 );
572 }
573
574 assert_eq!(result.total_triangles, 1);
576
577 assert!(
579 (result.global_coefficient - 1.0).abs() < 1e-10,
580 "Expected global 1.0, got {}",
581 result.global_coefficient
582 );
583 }
584
585 #[test]
586 fn test_star_graph_clustering() {
587 let store = create_star_graph();
588 let result = clustering_coefficient(&store);
589
590 for (_, coeff) in &result.coefficients {
592 assert_eq!(*coeff, 0.0);
593 }
594
595 assert_eq!(result.total_triangles, 0);
596 assert_eq!(result.global_coefficient, 0.0);
597 }
598
599 #[test]
600 fn test_complete_graph_clustering() {
601 let store = create_complete_graph(5);
602 let result = clustering_coefficient(&store);
603
604 for (_, coeff) in &result.coefficients {
606 assert!((*coeff - 1.0).abs() < 1e-10, "Expected 1.0, got {}", coeff);
607 }
608
609 assert_eq!(result.total_triangles, 10);
611 }
612
613 #[test]
614 fn test_path_graph_clustering() {
615 let store = create_path_graph();
616 let result = clustering_coefficient(&store);
617
618 assert_eq!(result.total_triangles, 0);
620
621 for (_, coeff) in &result.coefficients {
623 assert_eq!(*coeff, 0.0);
624 }
625 }
626
627 #[test]
628 fn test_empty_graph() {
629 let store = LpgStore::new();
630 let result = clustering_coefficient(&store);
631
632 assert!(result.coefficients.is_empty());
633 assert!(result.triangle_counts.is_empty());
634 assert_eq!(result.total_triangles, 0);
635 assert_eq!(result.global_coefficient, 0.0);
636 }
637
638 #[test]
639 fn test_single_node() {
640 let store = LpgStore::new();
641 let n0 = store.create_node(&["Node"]);
642
643 let result = clustering_coefficient(&store);
644
645 assert_eq!(result.coefficients.len(), 1);
646 assert_eq!(*result.coefficients.get(&n0).unwrap(), 0.0);
647 assert_eq!(result.total_triangles, 0);
648 }
649
650 #[test]
651 fn test_two_connected_nodes() {
652 let store = LpgStore::new();
653 let n0 = store.create_node(&["Node"]);
654 let n1 = store.create_node(&["Node"]);
655 store.create_edge(n0, n1, "EDGE");
656 store.create_edge(n1, n0, "EDGE");
657
658 let result = clustering_coefficient(&store);
659
660 assert_eq!(*result.coefficients.get(&n0).unwrap(), 0.0);
662 assert_eq!(*result.coefficients.get(&n1).unwrap(), 0.0);
663 assert_eq!(result.total_triangles, 0);
664 }
665
666 #[test]
667 fn test_triangle_count_function() {
668 let store = create_triangle_graph();
669 let counts = triangle_count(&store);
670
671 for (_, count) in counts {
673 assert_eq!(count, 1);
674 }
675 }
676
677 #[test]
678 fn test_local_clustering_coefficient_function() {
679 let store = create_complete_graph(4);
680 let coefficients = local_clustering_coefficient(&store);
681
682 for (_, coeff) in coefficients {
684 assert!((coeff - 1.0).abs() < 1e-10);
685 }
686 }
687
688 #[test]
689 fn test_global_clustering_coefficient_function() {
690 let store = create_triangle_graph();
691 let global = global_clustering_coefficient(&store);
692 assert!((global - 1.0).abs() < 1e-10);
693 }
694
695 #[test]
696 fn test_total_triangles_function() {
697 let store = create_complete_graph(4);
698 let total = total_triangles(&store);
699 assert_eq!(total, 4);
701 }
702
703 #[cfg(feature = "parallel")]
704 #[test]
705 fn test_parallel_matches_sequential() {
706 let store = create_complete_graph(20);
707
708 let sequential = clustering_coefficient(&store);
709 let parallel = clustering_coefficient_parallel(&store, 1); for (node, seq_coeff) in &sequential.coefficients {
713 let par_coeff = parallel.coefficients.get(node).unwrap();
714 assert!(
715 (seq_coeff - par_coeff).abs() < 1e-10,
716 "Mismatch for node {:?}: seq={}, par={}",
717 node,
718 seq_coeff,
719 par_coeff
720 );
721 }
722
723 assert_eq!(sequential.total_triangles, parallel.total_triangles);
724 assert!((sequential.global_coefficient - parallel.global_coefficient).abs() < 1e-10);
725 }
726
727 #[cfg(feature = "parallel")]
728 #[test]
729 fn test_parallel_threshold_fallback() {
730 let store = create_triangle_graph();
731
732 let result = clustering_coefficient_parallel(&store, 100);
734
735 assert_eq!(result.coefficients.len(), 3);
736 assert_eq!(result.total_triangles, 1);
737 }
738
739 #[test]
740 fn test_algorithm_wrapper() {
741 let store = create_triangle_graph();
742 let algo = ClusteringCoefficientAlgorithm;
743
744 assert_eq!(algo.name(), "clustering_coefficient");
745 assert!(!algo.description().is_empty());
746 assert_eq!(algo.parameters().len(), 2);
747
748 let params = Parameters::new();
749 let result = algo.execute(&store, ¶ms).unwrap();
750
751 assert_eq!(result.columns.len(), 3);
752 assert_eq!(result.row_count(), 3);
753 }
754
755 #[test]
756 fn test_algorithm_wrapper_sequential() {
757 let store = create_triangle_graph();
758 let algo = ClusteringCoefficientAlgorithm;
759
760 let mut params = Parameters::new();
761 params.set_bool("parallel", false);
762
763 let result = algo.execute(&store, ¶ms).unwrap();
764 assert_eq!(result.row_count(), 3);
765 }
766
767 #[cfg(feature = "parallel")]
768 #[test]
769 fn test_parallel_algorithm_trait() {
770 let store = create_complete_graph(10);
771 let algo = ClusteringCoefficientAlgorithm;
772
773 assert_eq!(algo.parallel_threshold(), 50);
774
775 let params = Parameters::new();
776 let result = algo.execute_parallel(&store, ¶ms, 4).unwrap();
777
778 assert_eq!(result.row_count(), 10);
779 }
780
781 #[test]
782 fn test_two_triangles_sharing_edge() {
783 let store = LpgStore::new();
790 let n0 = store.create_node(&["Node"]);
791 let n1 = store.create_node(&["Node"]);
792 let n2 = store.create_node(&["Node"]);
793 let n3 = store.create_node(&["Node"]);
794
795 store.create_edge(n0, n1, "EDGE");
797 store.create_edge(n1, n0, "EDGE");
798 store.create_edge(n1, n2, "EDGE");
799 store.create_edge(n2, n1, "EDGE");
800 store.create_edge(n2, n0, "EDGE");
801 store.create_edge(n0, n2, "EDGE");
802
803 store.create_edge(n1, n3, "EDGE");
805 store.create_edge(n3, n1, "EDGE");
806 store.create_edge(n3, n0, "EDGE");
807 store.create_edge(n0, n3, "EDGE");
808
809 let result = clustering_coefficient(&store);
810
811 assert_eq!(result.total_triangles, 2);
813
814 assert_eq!(*result.triangle_counts.get(&n0).unwrap(), 2);
816 assert_eq!(*result.triangle_counts.get(&n1).unwrap(), 2);
817
818 assert_eq!(*result.triangle_counts.get(&n2).unwrap(), 1);
820 assert_eq!(*result.triangle_counts.get(&n3).unwrap(), 1);
821
822 let coeff_0 = *result.coefficients.get(&n0).unwrap();
825 assert!(
826 (coeff_0 - 2.0 / 3.0).abs() < 1e-10,
827 "Expected 2/3, got {}",
828 coeff_0
829 );
830 }
831}