1use std::sync::{Arc, OnceLock};
7
8use grafeo_common::types::{NodeId, Value};
9use grafeo_common::utils::error::Result;
10use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
11use grafeo_core::graph::Direction;
12use grafeo_core::graph::lpg::LpgStore;
13use rayon::prelude::*;
14
15use super::super::{AlgorithmResult, ParameterDef, ParameterType, Parameters};
16use super::traits::{GraphAlgorithm, ParallelGraphAlgorithm};
17
18#[derive(Debug, Clone)]
24pub struct ClusteringCoefficientResult {
25 pub coefficients: FxHashMap<NodeId, f64>,
27 pub triangle_counts: FxHashMap<NodeId, u64>,
29 pub total_triangles: u64,
31 pub global_coefficient: f64,
33}
34
35fn build_undirected_neighbors(store: &LpgStore) -> FxHashMap<NodeId, FxHashSet<NodeId>> {
43 let nodes = store.node_ids();
44 let mut neighbors: FxHashMap<NodeId, FxHashSet<NodeId>> = FxHashMap::default();
45
46 for &node in &nodes {
48 neighbors.insert(node, FxHashSet::default());
49 }
50
51 for &node in &nodes {
53 for (neighbor, _) in store.edges_from(node, Direction::Outgoing) {
55 if let Some(set) = neighbors.get_mut(&node) {
56 set.insert(neighbor);
57 }
58 if let Some(set) = neighbors.get_mut(&neighbor) {
60 set.insert(node);
61 }
62 }
63
64 for (neighbor, _) in store.edges_from(node, Direction::Incoming) {
66 if let Some(set) = neighbors.get_mut(&node) {
67 set.insert(neighbor);
68 }
69 if let Some(set) = neighbors.get_mut(&neighbor) {
70 set.insert(node);
71 }
72 }
73 }
74
75 neighbors
76}
77
78fn count_node_triangles(
80 node_neighbors: &FxHashSet<NodeId>,
81 all_neighbors: &FxHashMap<NodeId, FxHashSet<NodeId>>,
82) -> u64 {
83 let neighbor_list: Vec<NodeId> = node_neighbors.iter().copied().collect();
84 let k = neighbor_list.len();
85 let mut triangles = 0u64;
86
87 for i in 0..k {
89 for j in (i + 1)..k {
90 let u = neighbor_list[i];
91 let w = neighbor_list[j];
92
93 if let Some(u_neighbors) = all_neighbors.get(&u) {
95 if u_neighbors.contains(&w) {
96 triangles += 1;
97 }
98 }
99 }
100 }
101
102 triangles
103}
104
105pub fn triangle_count(store: &LpgStore) -> FxHashMap<NodeId, u64> {
126 let neighbors = build_undirected_neighbors(store);
127 let mut counts: FxHashMap<NodeId, u64> = FxHashMap::default();
128
129 for (&node, node_neighbors) in &neighbors {
130 let triangles = count_node_triangles(node_neighbors, &neighbors);
131 counts.insert(node, triangles);
132 }
133
134 counts
135}
136
137pub fn local_clustering_coefficient(store: &LpgStore) -> FxHashMap<NodeId, f64> {
158 let neighbors = build_undirected_neighbors(store);
159 let mut coefficients: FxHashMap<NodeId, f64> = FxHashMap::default();
160
161 for (&node, node_neighbors) in &neighbors {
162 let k = node_neighbors.len();
163
164 if k < 2 {
165 coefficients.insert(node, 0.0);
167 } else {
168 let triangles = count_node_triangles(node_neighbors, &neighbors);
169 let max_triangles = (k * (k - 1)) / 2;
170 let coefficient = triangles as f64 / max_triangles as f64;
171 coefficients.insert(node, coefficient);
172 }
173 }
174
175 coefficients
176}
177
178pub fn global_clustering_coefficient(store: &LpgStore) -> f64 {
195 let local = local_clustering_coefficient(store);
196
197 if local.is_empty() {
198 return 0.0;
199 }
200
201 let sum: f64 = local.values().sum();
202 sum / local.len() as f64
203}
204
205pub fn total_triangles(store: &LpgStore) -> u64 {
221 let per_node = triangle_count(store);
222 per_node.values().sum::<u64>() / 3
224}
225
226pub fn clustering_coefficient(store: &LpgStore) -> ClusteringCoefficientResult {
244 let neighbors = build_undirected_neighbors(store);
245 let n = neighbors.len();
246
247 let mut coefficients: FxHashMap<NodeId, f64> = FxHashMap::default();
248 let mut triangle_counts: FxHashMap<NodeId, u64> = FxHashMap::default();
249
250 for (&node, node_neighbors) in &neighbors {
251 let k = node_neighbors.len();
252 let triangles = count_node_triangles(node_neighbors, &neighbors);
253
254 triangle_counts.insert(node, triangles);
255
256 let coefficient = if k < 2 {
257 0.0
258 } else {
259 let max_triangles = (k * (k - 1)) / 2;
260 triangles as f64 / max_triangles as f64
261 };
262 coefficients.insert(node, coefficient);
263 }
264
265 let total_triangles = triangle_counts.values().sum::<u64>() / 3;
266 let global_coefficient = if n == 0 {
267 0.0
268 } else {
269 coefficients.values().sum::<f64>() / n as f64
270 };
271
272 ClusteringCoefficientResult {
273 coefficients,
274 triangle_counts,
275 total_triangles,
276 global_coefficient,
277 }
278}
279
280pub fn clustering_coefficient_parallel(
302 store: &LpgStore,
303 parallel_threshold: usize,
304) -> ClusteringCoefficientResult {
305 let neighbors = build_undirected_neighbors(store);
306 let n = neighbors.len();
307
308 if n < parallel_threshold {
309 return clustering_coefficient(store);
311 }
312
313 let neighbors = Arc::new(neighbors);
315 let nodes: Vec<NodeId> = neighbors.keys().copied().collect();
316
317 let (coefficients, triangle_counts): (FxHashMap<NodeId, f64>, FxHashMap<NodeId, u64>) = nodes
319 .par_iter()
320 .fold(
321 || (FxHashMap::default(), FxHashMap::default()),
322 |(mut coeffs, mut triangles), &node| {
323 let node_neighbors = neighbors.get(&node).unwrap();
324 let k = node_neighbors.len();
325
326 let t = count_node_triangles(node_neighbors, &neighbors);
327
328 triangles.insert(node, t);
329
330 let coefficient = if k < 2 {
331 0.0
332 } else {
333 let max_triangles = (k * (k - 1)) / 2;
334 t as f64 / max_triangles as f64
335 };
336 coeffs.insert(node, coefficient);
337
338 (coeffs, triangles)
339 },
340 )
341 .reduce(
342 || (FxHashMap::default(), FxHashMap::default()),
343 |(mut c1, mut t1), (c2, t2)| {
344 c1.extend(c2);
345 t1.extend(t2);
346 (c1, t1)
347 },
348 );
349
350 let total_triangles = triangle_counts.values().sum::<u64>() / 3;
351 let global_coefficient = if n == 0 {
352 0.0
353 } else {
354 coefficients.values().sum::<f64>() / n as f64
355 };
356
357 ClusteringCoefficientResult {
358 coefficients,
359 triangle_counts,
360 total_triangles,
361 global_coefficient,
362 }
363}
364
365static CLUSTERING_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
371
372fn clustering_params() -> &'static [ParameterDef] {
373 CLUSTERING_PARAMS.get_or_init(|| {
374 vec![
375 ParameterDef {
376 name: "parallel".to_string(),
377 description: "Enable parallel computation (default: true)".to_string(),
378 param_type: ParameterType::Boolean,
379 required: false,
380 default: Some("true".to_string()),
381 },
382 ParameterDef {
383 name: "parallel_threshold".to_string(),
384 description: "Minimum nodes for parallel execution (default: 50)".to_string(),
385 param_type: ParameterType::Integer,
386 required: false,
387 default: Some("50".to_string()),
388 },
389 ]
390 })
391}
392
393pub struct ClusteringCoefficientAlgorithm;
395
396impl GraphAlgorithm for ClusteringCoefficientAlgorithm {
397 fn name(&self) -> &str {
398 "clustering_coefficient"
399 }
400
401 fn description(&self) -> &str {
402 "Local and global clustering coefficients with triangle counts"
403 }
404
405 fn parameters(&self) -> &[ParameterDef] {
406 clustering_params()
407 }
408
409 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
410 let parallel = params.get_bool("parallel").unwrap_or(true);
411 let threshold = params.get_int("parallel_threshold").unwrap_or(50) as usize;
412
413 let result = if parallel {
414 clustering_coefficient_parallel(store, threshold)
415 } else {
416 clustering_coefficient(store)
417 };
418
419 let mut output = AlgorithmResult::new(vec![
420 "node_id".to_string(),
421 "clustering_coefficient".to_string(),
422 "triangle_count".to_string(),
423 ]);
424
425 for (node, coefficient) in result.coefficients {
426 let triangles = *result.triangle_counts.get(&node).unwrap_or(&0);
427 output.add_row(vec![
428 Value::Int64(node.0 as i64),
429 Value::Float64(coefficient),
430 Value::Int64(triangles as i64),
431 ]);
432 }
433
434 Ok(output)
435 }
436}
437
438impl ParallelGraphAlgorithm for ClusteringCoefficientAlgorithm {
439 fn parallel_threshold(&self) -> usize {
440 50
441 }
442
443 fn execute_parallel(
444 &self,
445 store: &LpgStore,
446 _params: &Parameters,
447 _num_threads: usize,
448 ) -> Result<AlgorithmResult> {
449 let result = clustering_coefficient_parallel(store, self.parallel_threshold());
450
451 let mut output = AlgorithmResult::new(vec![
452 "node_id".to_string(),
453 "clustering_coefficient".to_string(),
454 "triangle_count".to_string(),
455 ]);
456
457 for (node, coefficient) in result.coefficients {
458 let triangles = *result.triangle_counts.get(&node).unwrap_or(&0);
459 output.add_row(vec![
460 Value::Int64(node.0 as i64),
461 Value::Float64(coefficient),
462 Value::Int64(triangles as i64),
463 ]);
464 }
465
466 Ok(output)
467 }
468}
469
470#[cfg(test)]
475mod tests {
476 use super::*;
477
478 fn create_triangle_graph() -> LpgStore {
479 let store = LpgStore::new();
481 let n0 = store.create_node(&["Node"]);
482 let n1 = store.create_node(&["Node"]);
483 let n2 = store.create_node(&["Node"]);
484
485 store.create_edge(n0, n1, "EDGE");
487 store.create_edge(n1, n0, "EDGE");
488 store.create_edge(n1, n2, "EDGE");
489 store.create_edge(n2, n1, "EDGE");
490 store.create_edge(n2, n0, "EDGE");
491 store.create_edge(n0, n2, "EDGE");
492
493 store
494 }
495
496 fn create_star_graph() -> LpgStore {
497 let store = LpgStore::new();
500 let center = store.create_node(&["Center"]);
501
502 for _ in 0..4 {
503 let leaf = store.create_node(&["Leaf"]);
504 store.create_edge(center, leaf, "EDGE");
505 store.create_edge(leaf, center, "EDGE");
506 }
507
508 store
509 }
510
511 fn create_complete_graph(n: usize) -> LpgStore {
512 let store = LpgStore::new();
514 let nodes: Vec<NodeId> = (0..n).map(|_| store.create_node(&["Node"])).collect();
515
516 for i in 0..n {
517 for j in (i + 1)..n {
518 store.create_edge(nodes[i], nodes[j], "EDGE");
519 store.create_edge(nodes[j], nodes[i], "EDGE");
520 }
521 }
522
523 store
524 }
525
526 fn create_path_graph() -> LpgStore {
527 let store = LpgStore::new();
529 let n0 = store.create_node(&["Node"]);
530 let n1 = store.create_node(&["Node"]);
531 let n2 = store.create_node(&["Node"]);
532 let n3 = store.create_node(&["Node"]);
533
534 store.create_edge(n0, n1, "EDGE");
535 store.create_edge(n1, n0, "EDGE");
536 store.create_edge(n1, n2, "EDGE");
537 store.create_edge(n2, n1, "EDGE");
538 store.create_edge(n2, n3, "EDGE");
539 store.create_edge(n3, n2, "EDGE");
540
541 store
542 }
543
544 #[test]
545 fn test_triangle_graph_clustering() {
546 let store = create_triangle_graph();
547 let result = clustering_coefficient(&store);
548
549 for (_, coeff) in &result.coefficients {
551 assert!(
552 (*coeff - 1.0).abs() < 1e-10,
553 "Expected coefficient 1.0, got {}",
554 coeff
555 );
556 }
557
558 assert_eq!(result.total_triangles, 1);
560
561 assert!(
563 (result.global_coefficient - 1.0).abs() < 1e-10,
564 "Expected global 1.0, got {}",
565 result.global_coefficient
566 );
567 }
568
569 #[test]
570 fn test_star_graph_clustering() {
571 let store = create_star_graph();
572 let result = clustering_coefficient(&store);
573
574 for (_, coeff) in &result.coefficients {
576 assert_eq!(*coeff, 0.0);
577 }
578
579 assert_eq!(result.total_triangles, 0);
580 assert_eq!(result.global_coefficient, 0.0);
581 }
582
583 #[test]
584 fn test_complete_graph_clustering() {
585 let store = create_complete_graph(5);
586 let result = clustering_coefficient(&store);
587
588 for (_, coeff) in &result.coefficients {
590 assert!((*coeff - 1.0).abs() < 1e-10, "Expected 1.0, got {}", coeff);
591 }
592
593 assert_eq!(result.total_triangles, 10);
595 }
596
597 #[test]
598 fn test_path_graph_clustering() {
599 let store = create_path_graph();
600 let result = clustering_coefficient(&store);
601
602 assert_eq!(result.total_triangles, 0);
604
605 for (_, coeff) in &result.coefficients {
607 assert_eq!(*coeff, 0.0);
608 }
609 }
610
611 #[test]
612 fn test_empty_graph() {
613 let store = LpgStore::new();
614 let result = clustering_coefficient(&store);
615
616 assert!(result.coefficients.is_empty());
617 assert!(result.triangle_counts.is_empty());
618 assert_eq!(result.total_triangles, 0);
619 assert_eq!(result.global_coefficient, 0.0);
620 }
621
622 #[test]
623 fn test_single_node() {
624 let store = LpgStore::new();
625 let n0 = store.create_node(&["Node"]);
626
627 let result = clustering_coefficient(&store);
628
629 assert_eq!(result.coefficients.len(), 1);
630 assert_eq!(*result.coefficients.get(&n0).unwrap(), 0.0);
631 assert_eq!(result.total_triangles, 0);
632 }
633
634 #[test]
635 fn test_two_connected_nodes() {
636 let store = LpgStore::new();
637 let n0 = store.create_node(&["Node"]);
638 let n1 = store.create_node(&["Node"]);
639 store.create_edge(n0, n1, "EDGE");
640 store.create_edge(n1, n0, "EDGE");
641
642 let result = clustering_coefficient(&store);
643
644 assert_eq!(*result.coefficients.get(&n0).unwrap(), 0.0);
646 assert_eq!(*result.coefficients.get(&n1).unwrap(), 0.0);
647 assert_eq!(result.total_triangles, 0);
648 }
649
650 #[test]
651 fn test_triangle_count_function() {
652 let store = create_triangle_graph();
653 let counts = triangle_count(&store);
654
655 for (_, count) in counts {
657 assert_eq!(count, 1);
658 }
659 }
660
661 #[test]
662 fn test_local_clustering_coefficient_function() {
663 let store = create_complete_graph(4);
664 let coefficients = local_clustering_coefficient(&store);
665
666 for (_, coeff) in coefficients {
668 assert!((coeff - 1.0).abs() < 1e-10);
669 }
670 }
671
672 #[test]
673 fn test_global_clustering_coefficient_function() {
674 let store = create_triangle_graph();
675 let global = global_clustering_coefficient(&store);
676 assert!((global - 1.0).abs() < 1e-10);
677 }
678
679 #[test]
680 fn test_total_triangles_function() {
681 let store = create_complete_graph(4);
682 let total = total_triangles(&store);
683 assert_eq!(total, 4);
685 }
686
687 #[test]
688 fn test_parallel_matches_sequential() {
689 let store = create_complete_graph(20);
690
691 let sequential = clustering_coefficient(&store);
692 let parallel = clustering_coefficient_parallel(&store, 1); for (node, seq_coeff) in &sequential.coefficients {
696 let par_coeff = parallel.coefficients.get(node).unwrap();
697 assert!(
698 (seq_coeff - par_coeff).abs() < 1e-10,
699 "Mismatch for node {:?}: seq={}, par={}",
700 node,
701 seq_coeff,
702 par_coeff
703 );
704 }
705
706 assert_eq!(sequential.total_triangles, parallel.total_triangles);
707 assert!((sequential.global_coefficient - parallel.global_coefficient).abs() < 1e-10);
708 }
709
710 #[test]
711 fn test_parallel_threshold_fallback() {
712 let store = create_triangle_graph();
713
714 let result = clustering_coefficient_parallel(&store, 100);
716
717 assert_eq!(result.coefficients.len(), 3);
718 assert_eq!(result.total_triangles, 1);
719 }
720
721 #[test]
722 fn test_algorithm_wrapper() {
723 let store = create_triangle_graph();
724 let algo = ClusteringCoefficientAlgorithm;
725
726 assert_eq!(algo.name(), "clustering_coefficient");
727 assert!(!algo.description().is_empty());
728 assert_eq!(algo.parameters().len(), 2);
729
730 let params = Parameters::new();
731 let result = algo.execute(&store, ¶ms).unwrap();
732
733 assert_eq!(result.columns.len(), 3);
734 assert_eq!(result.row_count(), 3);
735 }
736
737 #[test]
738 fn test_algorithm_wrapper_sequential() {
739 let store = create_triangle_graph();
740 let algo = ClusteringCoefficientAlgorithm;
741
742 let mut params = Parameters::new();
743 params.set_bool("parallel", false);
744
745 let result = algo.execute(&store, ¶ms).unwrap();
746 assert_eq!(result.row_count(), 3);
747 }
748
749 #[test]
750 fn test_parallel_algorithm_trait() {
751 let store = create_complete_graph(10);
752 let algo = ClusteringCoefficientAlgorithm;
753
754 assert_eq!(algo.parallel_threshold(), 50);
755
756 let params = Parameters::new();
757 let result = algo.execute_parallel(&store, ¶ms, 4).unwrap();
758
759 assert_eq!(result.row_count(), 10);
760 }
761
762 #[test]
763 fn test_two_triangles_sharing_edge() {
764 let store = LpgStore::new();
771 let n0 = store.create_node(&["Node"]);
772 let n1 = store.create_node(&["Node"]);
773 let n2 = store.create_node(&["Node"]);
774 let n3 = store.create_node(&["Node"]);
775
776 store.create_edge(n0, n1, "EDGE");
778 store.create_edge(n1, n0, "EDGE");
779 store.create_edge(n1, n2, "EDGE");
780 store.create_edge(n2, n1, "EDGE");
781 store.create_edge(n2, n0, "EDGE");
782 store.create_edge(n0, n2, "EDGE");
783
784 store.create_edge(n1, n3, "EDGE");
786 store.create_edge(n3, n1, "EDGE");
787 store.create_edge(n3, n0, "EDGE");
788 store.create_edge(n0, n3, "EDGE");
789
790 let result = clustering_coefficient(&store);
791
792 assert_eq!(result.total_triangles, 2);
794
795 assert_eq!(*result.triangle_counts.get(&n0).unwrap(), 2);
797 assert_eq!(*result.triangle_counts.get(&n1).unwrap(), 2);
798
799 assert_eq!(*result.triangle_counts.get(&n2).unwrap(), 1);
801 assert_eq!(*result.triangle_counts.get(&n3).unwrap(), 1);
802
803 let coeff_0 = *result.coefficients.get(&n0).unwrap();
806 assert!(
807 (coeff_0 - 2.0 / 3.0).abs() < 1e-10,
808 "Expected 2/3, got {}",
809 coeff_0
810 );
811 }
812}