1use std::collections::HashMap;
19use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use crossbeam_queue::SegQueue;
23use rayon::prelude::*;
24
25use crate::errors::GraphResult;
26use crate::graph::traits::{GraphBase, GraphQuery};
27use crate::graph::Graph;
28use crate::node::NodeIndex;
29
30pub fn par_dfs<T, F>(graph: &Graph<T, impl Clone + Send + Sync>, start: NodeIndex, visitor: F)
34where
35 T: Clone + Send + Sync,
36 F: Fn(NodeIndex) -> bool + Send + Sync,
37{
38 let n = graph.node_count();
39 if n == 0 {
40 return;
41 }
42
43 let visited: Vec<AtomicBool> = (0..n).map(|_| AtomicBool::new(false)).collect();
44 let visited = Arc::new(visited);
45
46 visited[start.index()].store(true, Ordering::SeqCst);
48
49 let neighbors: Vec<NodeIndex> = graph.neighbors(start).collect();
51
52 neighbors.into_par_iter().for_each(|neighbor| {
54 if !visited[neighbor.index()].load(Ordering::Relaxed)
55 && visited[neighbor.index()]
56 .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
57 .is_ok()
58 && visitor(neighbor)
59 {
60 par_dfs_subtree(graph, neighbor, &visited, &visitor);
61 }
62 });
63}
64
65fn par_dfs_subtree<T, F>(
67 graph: &Graph<T, impl Clone + Send + Sync>,
68 node: NodeIndex,
69 visited: &Vec<AtomicBool>,
70 visitor: &F,
71) where
72 T: Clone + Send + Sync,
73 F: Fn(NodeIndex) -> bool + Send + Sync,
74{
75 let unvisited_neighbors: Vec<NodeIndex> = graph
77 .neighbors(node)
78 .filter(|&n| {
79 !visited[n.index()].load(Ordering::Relaxed)
80 && visited[n.index()]
81 .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
82 .is_ok()
83 })
84 .collect();
85
86 if unvisited_neighbors.is_empty() {
87 return;
88 }
89
90 if unvisited_neighbors.len() >= 4 {
92 unvisited_neighbors.into_par_iter().for_each(|neighbor| {
93 if visitor(neighbor) {
94 par_dfs_subtree(graph, neighbor, visited, visitor);
95 }
96 });
97 } else {
98 for neighbor in unvisited_neighbors {
100 if visitor(neighbor) {
101 par_dfs_subtree(graph, neighbor, visited, visitor);
102 }
103 }
104 }
105}
106
107pub fn par_pagerank<T>(
112 graph: &Graph<T, impl Clone + Send + Sync>,
113 damping: f64,
114 iterations: usize,
115) -> HashMap<NodeIndex, f64>
116where
117 T: Clone + Send + Sync,
118{
119 let n = graph.node_count();
120 if n == 0 {
121 return HashMap::new();
122 }
123
124 let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
126 let node_to_pos: HashMap<NodeIndex, usize> = node_indices
127 .iter()
128 .enumerate()
129 .map(|(i, &ni)| (ni, i))
130 .collect();
131
132 let out_degrees: Vec<usize> = node_indices
134 .iter()
135 .map(|&ni| graph.out_degree(ni).unwrap_or(0))
136 .collect();
137
138 let mut incoming: Vec<Vec<usize>> = vec![Vec::new(); n];
141 for edge in graph.edges() {
142 let src = edge.source();
143 let tgt = edge.target();
144 if let (Some(&src_pos), Some(&tgt_pos)) = (node_to_pos.get(&src), node_to_pos.get(&tgt)) {
145 incoming[tgt_pos].push(src_pos);
146 }
147 }
148
149 let mut scores: Vec<f64> = vec![1.0 / n as f64; n];
151
152 for _ in 0..iterations {
153 let new_scores: Vec<f64> = (0..n)
155 .into_par_iter()
156 .map(|i| {
157 let mut rank = (1.0 - damping) / n as f64;
159
160 for &neighbor_pos in &incoming[i] {
162 let out_degree = out_degrees[neighbor_pos];
163 if out_degree > 0 {
164 rank += damping * scores[neighbor_pos] / out_degree as f64;
165 }
166 }
167
168 rank
169 })
170 .collect();
171
172 scores = new_scores;
173 }
174
175 node_indices
177 .into_iter()
178 .enumerate()
179 .map(|(i, ni)| (ni, scores[i]))
180 .collect()
181}
182
183#[cfg(feature = "simd")]
232pub fn par_pagerank_simd<T>(
233 graph: &Graph<T, impl Clone + Send + Sync>,
234 damping: f64,
235 iterations: usize,
236) -> HashMap<NodeIndex, f64>
237where
238 T: Clone + Send + Sync,
239{
240 use wide::f64x4;
241
242 let n = graph.node_count();
243 if n == 0 {
244 return HashMap::new();
245 }
246
247 let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
249 let node_to_pos: HashMap<NodeIndex, usize> = node_indices
250 .iter()
251 .enumerate()
252 .map(|(i, &ni)| (ni, i))
253 .collect();
254
255 let out_degrees: Vec<usize> = node_indices
257 .iter()
258 .map(|&ni| graph.out_degree(ni).unwrap_or(0))
259 .collect();
260
261 let mut incoming: Vec<Vec<usize>> = vec![Vec::new(); n];
263 for edge in graph.edges() {
264 let src = edge.source();
265 let tgt = edge.target();
266 if let (Some(&src_pos), Some(&tgt_pos)) = (node_to_pos.get(&src), node_to_pos.get(&tgt)) {
267 incoming[tgt_pos].push(src_pos);
268 }
269 }
270
271 let mut scores: Vec<f64> = vec![1.0 / n as f64; n];
273
274 let base_rank = (1.0 - damping) / n as f64;
276 let damping_simd = f64x4::new([damping; 4]);
277
278 for _ in 0..iterations {
279 let new_scores: Vec<f64> = (0..n)
281 .into_par_iter()
282 .map(|i| {
283 let mut rank = base_rank;
285
286 let neighbors = &incoming[i];
287 let len = neighbors.len();
288
289 let mut j = 0;
291 while j + 4 <= len {
292 let neighbor_indices = [
294 neighbors[j],
295 neighbors[j + 1],
296 neighbors[j + 2],
297 neighbors[j + 3],
298 ];
299
300 let scores_array = [
302 scores[neighbor_indices[0]],
303 scores[neighbor_indices[1]],
304 scores[neighbor_indices[2]],
305 scores[neighbor_indices[3]],
306 ];
307 let scores_simd = f64x4::new(scores_array);
308
309 let inv_degrees = [
311 if out_degrees[neighbor_indices[0]] > 0 {
312 1.0 / out_degrees[neighbor_indices[0]] as f64
313 } else {
314 0.0
315 },
316 if out_degrees[neighbor_indices[1]] > 0 {
317 1.0 / out_degrees[neighbor_indices[1]] as f64
318 } else {
319 0.0
320 },
321 if out_degrees[neighbor_indices[2]] > 0 {
322 1.0 / out_degrees[neighbor_indices[2]] as f64
323 } else {
324 0.0
325 },
326 if out_degrees[neighbor_indices[3]] > 0 {
327 1.0 / out_degrees[neighbor_indices[3]] as f64
328 } else {
329 0.0
330 },
331 ];
332 let inv_degrees_simd = f64x4::new(inv_degrees);
333
334 let contributions = damping_simd * scores_simd * inv_degrees_simd;
336
337 let sum: [f64; 4] = contributions.into();
339 rank += sum[0] + sum[1] + sum[2] + sum[3];
340
341 j += 4;
342 }
343
344 while j < len {
346 let neighbor_pos = neighbors[j];
347 let out_degree = out_degrees[neighbor_pos];
348 if out_degree > 0 {
349 rank += damping * scores[neighbor_pos] / out_degree as f64;
350 }
351 j += 1;
352 }
353
354 rank
355 })
356 .collect();
357
358 scores = new_scores;
359 }
360
361 node_indices
363 .into_iter()
364 .enumerate()
365 .map(|(i, ni)| (ni, scores[i]))
366 .collect()
367}
368
369#[cfg(feature = "simd")]
382pub fn par_degree_centrality_simd<T>(
383 graph: &Graph<T, impl Clone + Send + Sync>,
384) -> HashMap<NodeIndex, f64>
385where
386 T: Clone + Send + Sync,
387{
388 let n = graph.node_count();
389 if n <= 1 {
390 return HashMap::new();
391 }
392
393 let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
394 let norm = 1.0 / (n - 1) as f64;
395
396 let centralities: Vec<f64> = node_indices
399 .par_iter()
400 .map(|&ni| {
401 let degree = graph.out_degree(ni).unwrap_or(0) as f64;
402 degree * norm
403 })
404 .collect();
405
406 node_indices.into_iter().zip(centralities).collect()
408}
409
410pub fn par_bfs<T, F>(graph: &Graph<T, impl Clone + Send + Sync>, start: NodeIndex, visitor: F)
415where
416 T: Clone + Send + Sync,
417 F: Fn(NodeIndex, usize) -> bool + Send + Sync,
418{
419 let n = graph.node_count();
420 let visited: Vec<AtomicBool> = (0..n).map(|_| AtomicBool::new(false)).collect();
421 let visited = Arc::new(visited);
422
423 visited[start.index()].store(true, Ordering::SeqCst);
425
426 let mut current_layer = vec![start];
427 let mut depth = 0;
428
429 while !current_layer.is_empty() {
430 let next_layer_vecs: Vec<Vec<NodeIndex>> = current_layer
432 .par_iter()
433 .filter_map(|&node| {
434 if !visitor(node, depth) {
435 return None;
436 }
437
438 let neighbors: Vec<NodeIndex> = graph
439 .neighbors(node)
440 .filter(|&neighbor| {
441 !visited[neighbor.index()].load(Ordering::Relaxed)
442 && visited[neighbor.index()]
443 .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
444 .is_ok()
445 })
446 .collect();
447
448 if neighbors.is_empty() {
449 None
450 } else {
451 Some(neighbors)
452 }
453 })
454 .collect();
455
456 let mut next_layer = Vec::new();
458 for layer_vec in next_layer_vecs {
459 next_layer.extend(layer_vec);
460 }
461
462 depth += 1;
463 current_layer = next_layer;
464 }
465}
466
467pub fn par_connected_components<T>(
476 graph: &Graph<T, impl Clone + Send + Sync>,
477) -> Vec<Vec<NodeIndex>>
478where
479 T: Clone + Send + Sync,
480{
481 let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
482 let n = node_indices.len();
483
484 if n == 0 {
485 return Vec::new();
486 }
487
488 let parent: Vec<AtomicUsize> = (0..n).map(AtomicUsize::new).collect();
490
491 for (i, atomic) in parent.iter().enumerate().take(n) {
493 atomic.store(i, Ordering::Relaxed);
494 }
495
496 fn find(parent: &[AtomicUsize], mut i: usize) -> usize {
498 loop {
499 let p = parent[i].load(Ordering::Relaxed);
500 if p == i {
501 return p;
502 }
503 i = p;
504 }
505 }
506
507 fn union_atomic(parent: &[AtomicUsize], i: usize, j: usize) {
509 let root_i = find(parent, i);
510 let root_j = find(parent, j);
511
512 if root_i == root_j {
513 return;
514 }
515
516 let (old_root, new_root) = if root_i < root_j {
518 (root_i, root_j)
519 } else {
520 (root_j, root_i)
521 };
522
523 let _ = parent[old_root].compare_exchange(
525 old_root,
526 new_root,
527 Ordering::SeqCst,
528 Ordering::Relaxed,
529 );
530 }
531
532 let edges: Vec<(usize, usize)> = graph
534 .edges()
535 .flat_map(|edge| {
536 let source_idx = edge.source().index();
537 let target_idx = edge.target().index();
538 vec![(source_idx, target_idx), (target_idx, source_idx)]
539 })
540 .collect();
541
542 edges.par_iter().for_each(|&(src, tgt)| {
543 union_atomic(&parent, src, tgt);
544 });
545
546 let mut components_map: HashMap<usize, Vec<NodeIndex>> = HashMap::new();
548
549 for &node in &node_indices {
550 let root = find(&parent, node.index());
551 components_map.entry(root).or_default().push(node);
552 }
553
554 components_map.into_values().collect()
555}
556
557pub fn par_degree_centrality<T>(
559 graph: &Graph<T, impl Clone + Send + Sync>,
560) -> HashMap<NodeIndex, f64>
561where
562 T: Clone + Send + Sync,
563{
564 let n = graph.node_count();
565 if n <= 1 {
566 return HashMap::new();
567 }
568
569 let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
570 let norm = 1.0 / (n - 1) as f64;
571
572 node_indices
573 .par_iter()
574 .map(|&ni| {
575 let degree = graph.out_degree(ni).unwrap_or(0) as f64;
576 (ni, degree * norm)
577 })
578 .collect()
579}
580
581pub fn par_dijkstra<T, E, F>(
609 graph: &Graph<T, E>,
610 source: NodeIndex,
611 get_weight: F,
612 delta: f64,
613) -> GraphResult<HashMap<NodeIndex, f64>>
614where
615 T: Clone + Send + Sync,
616 E: Clone + Send + Sync,
617 F: Fn(NodeIndex, NodeIndex, &E) -> f64 + Send + Sync,
618{
619 let n = graph.node_count();
620 if n == 0 {
621 return Ok(HashMap::new());
622 }
623
624 let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
626 let node_to_pos: HashMap<NodeIndex, usize> = node_indices
627 .iter()
628 .enumerate()
629 .map(|(i, &ni)| (ni, i))
630 .collect();
631
632 let distances: Vec<AtomicU64> = (0..n)
634 .map(|_| AtomicU64::new(f64::INFINITY.to_bits()))
635 .collect();
636
637 if let Some(&source_pos) = node_to_pos.get(&source) {
639 distances[source_pos].store(0.0_f64.to_bits(), Ordering::Relaxed);
640 }
641
642 let buckets: Vec<SegQueue<usize>> = (0..10000).map(|_| SegQueue::new()).collect();
645 let buckets = Arc::new(buckets);
646
647 if let Some(&source_pos) = node_to_pos.get(&source) {
649 buckets[0].push(source_pos);
650 }
651
652 let settled: Vec<AtomicBool> = (0..n).map(|_| AtomicBool::new(false)).collect();
654
655 let mut current_bucket = 0;
656 let mut empty_count = 0;
657 let max_empty_buckets = 100;
658
659 loop {
660 let mut nodes_to_process: Vec<usize> = Vec::new();
662 while let Some(node_pos) = buckets[current_bucket].pop() {
663 nodes_to_process.push(node_pos);
664 }
665
666 if nodes_to_process.is_empty() {
667 empty_count += 1;
668 if empty_count >= max_empty_buckets {
669 break;
671 }
672 current_bucket += 1;
673 continue;
674 }
675
676 empty_count = 0;
677
678 let nodes_to_process_clone = nodes_to_process.clone();
680
681 nodes_to_process.into_par_iter().for_each(|node_pos| {
683 if settled[node_pos].load(Ordering::Relaxed) {
684 return;
685 }
686
687 let node = node_indices[node_pos];
688 let node_dist = {
689 let dist_bits = distances[node_pos].load(Ordering::Relaxed);
690 f64::from_bits(dist_bits)
691 };
692
693 for neighbor in graph.neighbors(node) {
695 if let Some(&neighbor_pos) = node_to_pos.get(&neighbor) {
696 if settled[neighbor_pos].load(Ordering::Relaxed) {
697 continue;
698 }
699
700 if let Ok(edge_data) = graph.get_edge_by_nodes(node, neighbor) {
701 let weight = get_weight(node, neighbor, edge_data);
702 let new_dist = node_dist + weight;
703
704 let mut current_bits = distances[neighbor_pos].load(Ordering::Relaxed);
706 loop {
707 let current_dist = f64::from_bits(current_bits);
708 if new_dist >= current_dist {
709 break;
710 }
711 let new_bits = new_dist.to_bits();
712 match distances[neighbor_pos].compare_exchange(
713 current_bits,
714 new_bits,
715 Ordering::Relaxed,
716 Ordering::Relaxed,
717 ) {
718 Ok(_) => {
719 let bucket_idx = ((new_dist / delta).floor() as usize)
721 .saturating_add(1)
722 .min(buckets.len() - 1);
723 buckets[bucket_idx].push(neighbor_pos);
724 break;
725 }
726 Err(observed) => {
727 current_bits = observed;
729 }
730 }
731 }
732 }
733 }
734 }
735 });
736
737 for node_pos in nodes_to_process_clone {
739 settled[node_pos].store(true, Ordering::Relaxed);
740 }
741
742 current_bucket += 1;
743 }
744
745 let mut result = HashMap::with_capacity(n);
747 for (i, &ni) in node_indices.iter().enumerate() {
748 let dist_bits = distances[i].load(Ordering::Relaxed);
749 let dist = f64::from_bits(dist_bits);
750 if dist != f64::INFINITY {
751 result.insert(ni, dist);
752 }
753 }
754
755 Ok(result)
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761 use crate::graph::builders::GraphBuilder;
762
763 #[test]
764 fn test_par_pagerank() {
765 let graph = GraphBuilder::directed()
766 .with_nodes(vec!["A", "B", "C"])
767 .with_edges(vec![(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0)])
768 .build()
769 .unwrap();
770
771 let ranks = par_pagerank(&graph, 0.85, 20);
772 assert_eq!(ranks.len(), 3);
773
774 let values: Vec<_> = ranks.values().collect();
776 for i in 1..values.len() {
777 assert!((values[i] - values[0]).abs() < 0.01);
778 }
779 }
780
781 #[test]
782 fn test_par_connected_components() {
783 let graph = GraphBuilder::undirected()
784 .with_nodes(vec![1, 2, 3, 4, 5, 6])
785 .with_edges(vec![(0, 1, 1.0), (1, 2, 1.0), (3, 4, 1.0)])
786 .build()
787 .unwrap();
788
789 let components = par_connected_components(&graph);
790 assert_eq!(components.len(), 3); }
792
793 #[test]
794 fn test_par_degree_centrality() {
795 let graph = GraphBuilder::directed()
796 .with_nodes(vec!["A", "B", "C", "D"])
797 .with_edges(vec![(0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0)])
798 .build()
799 .unwrap();
800
801 let centrality = par_degree_centrality(&graph);
802 assert_eq!(centrality.len(), 4);
803 }
804
805 #[test]
806 fn test_par_dfs() {
807 use std::sync::atomic::AtomicUsize;
808
809 let graph = GraphBuilder::directed()
810 .with_nodes(vec!["A", "B", "C", "D", "E", "F"])
811 .with_edges(vec![
812 (0, 1, 1.0),
813 (0, 2, 1.0),
814 (0, 3, 1.0), (1, 4, 1.0), (2, 5, 1.0), ])
818 .build()
819 .unwrap();
820
821 let start = graph.nodes().next().unwrap().index();
822 let count = Arc::new(AtomicUsize::new(1)); let count_clone = count.clone();
824
825 par_dfs(&graph, start, move |_node| {
826 count_clone.fetch_add(1, Ordering::SeqCst);
827 true
828 });
829
830 assert_eq!(count.load(Ordering::SeqCst), 6);
832 }
833
834 #[test]
835 fn test_par_dijkstra_basic() {
836 let graph = GraphBuilder::directed()
837 .with_nodes(vec!["A", "B", "C", "D"])
838 .with_edges(vec![
839 (0, 1, 1.0),
840 (0, 2, 4.0),
841 (1, 2, 2.0),
842 (1, 3, 5.0),
843 (2, 3, 1.0),
844 ])
845 .build()
846 .unwrap();
847
848 let start = graph.nodes().next().unwrap().index();
849 let distances = par_dijkstra(&graph, start, |_, _, w| *w, 1.0).unwrap();
850
851 assert!(distances.contains_key(&start));
852 assert_eq!(distances.get(&start), Some(&0.0));
853 }
854
855 #[test]
856 fn test_par_dijkstra_single_node() {
857 let graph = GraphBuilder::directed()
858 .with_nodes(vec![1])
859 .build()
860 .unwrap();
861
862 let start = graph.nodes().next().unwrap().index();
863 let distances = par_dijkstra(&graph, start, |_, _, _: &f64| 1.0, 1.0).unwrap();
864
865 assert_eq!(distances.len(), 1);
866 assert_eq!(distances.get(&start), Some(&0.0));
867 }
868
869 #[test]
870 fn test_par_dijkstra_empty_graph() {
871 let graph: Graph<i32, f64> = GraphBuilder::directed().build().unwrap();
872 let distances =
873 par_dijkstra(&graph, NodeIndex::new(0, 1), |_, _, _: &f64| 1.0, 1.0).unwrap();
874 assert!(distances.is_empty());
875 }
876}