use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use crossbeam_queue::SegQueue;
use rayon::prelude::*;
use crate::errors::GraphResult;
use crate::graph::traits::{GraphBase, GraphQuery};
use crate::graph::Graph;
use crate::node::NodeIndex;
pub fn par_dfs<T, F>(graph: &Graph<T, impl Clone + Send + Sync>, start: NodeIndex, visitor: F)
where
T: Clone + Send + Sync,
F: Fn(NodeIndex) -> bool + Send + Sync,
{
let n = graph.node_count();
if n == 0 {
return;
}
let visited: Vec<AtomicBool> = (0..n).map(|_| AtomicBool::new(false)).collect();
let visited = Arc::new(visited);
visited[start.index()].store(true, Ordering::SeqCst);
let neighbors: Vec<NodeIndex> = graph.neighbors(start).collect();
neighbors.into_par_iter().for_each(|neighbor| {
if !visited[neighbor.index()].load(Ordering::Relaxed)
&& visited[neighbor.index()]
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
&& visitor(neighbor)
{
par_dfs_subtree(graph, neighbor, &visited, &visitor);
}
});
}
fn par_dfs_subtree<T, F>(
graph: &Graph<T, impl Clone + Send + Sync>,
node: NodeIndex,
visited: &Vec<AtomicBool>,
visitor: &F,
) where
T: Clone + Send + Sync,
F: Fn(NodeIndex) -> bool + Send + Sync,
{
let unvisited_neighbors: Vec<NodeIndex> = graph
.neighbors(node)
.filter(|&n| {
!visited[n.index()].load(Ordering::Relaxed)
&& visited[n.index()]
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
})
.collect();
if unvisited_neighbors.is_empty() {
return;
}
if unvisited_neighbors.len() >= 4 {
unvisited_neighbors.into_par_iter().for_each(|neighbor| {
if visitor(neighbor) {
par_dfs_subtree(graph, neighbor, visited, visitor);
}
});
} else {
for neighbor in unvisited_neighbors {
if visitor(neighbor) {
par_dfs_subtree(graph, neighbor, visited, visitor);
}
}
}
}
pub fn par_pagerank<T>(
graph: &Graph<T, impl Clone + Send + Sync>,
damping: f64,
iterations: usize,
) -> HashMap<NodeIndex, f64>
where
T: Clone + Send + Sync,
{
let n = graph.node_count();
if n == 0 {
return HashMap::new();
}
let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
let node_to_pos: HashMap<NodeIndex, usize> = node_indices
.iter()
.enumerate()
.map(|(i, &ni)| (ni, i))
.collect();
let out_degrees: Vec<usize> = node_indices
.iter()
.map(|&ni| graph.out_degree(ni).unwrap_or(0))
.collect();
let mut incoming: Vec<Vec<usize>> = vec![Vec::new(); n];
for edge in graph.edges() {
let src = edge.source();
let tgt = edge.target();
if let (Some(&src_pos), Some(&tgt_pos)) = (node_to_pos.get(&src), node_to_pos.get(&tgt)) {
incoming[tgt_pos].push(src_pos);
}
}
let mut scores: Vec<f64> = vec![1.0 / n as f64; n];
for _ in 0..iterations {
let new_scores: Vec<f64> = (0..n)
.into_par_iter()
.map(|i| {
let mut rank = (1.0 - damping) / n as f64;
for &neighbor_pos in &incoming[i] {
let out_degree = out_degrees[neighbor_pos];
if out_degree > 0 {
rank += damping * scores[neighbor_pos] / out_degree as f64;
}
}
rank
})
.collect();
scores = new_scores;
}
node_indices
.into_iter()
.enumerate()
.map(|(i, ni)| (ni, scores[i]))
.collect()
}
#[cfg(feature = "simd")]
pub fn par_pagerank_simd<T>(
graph: &Graph<T, impl Clone + Send + Sync>,
damping: f64,
iterations: usize,
) -> HashMap<NodeIndex, f64>
where
T: Clone + Send + Sync,
{
use wide::f64x4;
let n = graph.node_count();
if n == 0 {
return HashMap::new();
}
let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
let node_to_pos: HashMap<NodeIndex, usize> = node_indices
.iter()
.enumerate()
.map(|(i, &ni)| (ni, i))
.collect();
let out_degrees: Vec<usize> = node_indices
.iter()
.map(|&ni| graph.out_degree(ni).unwrap_or(0))
.collect();
let mut incoming: Vec<Vec<usize>> = vec![Vec::new(); n];
for edge in graph.edges() {
let src = edge.source();
let tgt = edge.target();
if let (Some(&src_pos), Some(&tgt_pos)) = (node_to_pos.get(&src), node_to_pos.get(&tgt)) {
incoming[tgt_pos].push(src_pos);
}
}
let mut scores: Vec<f64> = vec![1.0 / n as f64; n];
let base_rank = (1.0 - damping) / n as f64;
let damping_simd = f64x4::new([damping; 4]);
for _ in 0..iterations {
let new_scores: Vec<f64> = (0..n)
.into_par_iter()
.map(|i| {
let mut rank = base_rank;
let neighbors = &incoming[i];
let len = neighbors.len();
let mut j = 0;
while j + 4 <= len {
let neighbor_indices = [
neighbors[j],
neighbors[j + 1],
neighbors[j + 2],
neighbors[j + 3],
];
let scores_array = [
scores[neighbor_indices[0]],
scores[neighbor_indices[1]],
scores[neighbor_indices[2]],
scores[neighbor_indices[3]],
];
let scores_simd = f64x4::new(scores_array);
let inv_degrees = [
if out_degrees[neighbor_indices[0]] > 0 {
1.0 / out_degrees[neighbor_indices[0]] as f64
} else {
0.0
},
if out_degrees[neighbor_indices[1]] > 0 {
1.0 / out_degrees[neighbor_indices[1]] as f64
} else {
0.0
},
if out_degrees[neighbor_indices[2]] > 0 {
1.0 / out_degrees[neighbor_indices[2]] as f64
} else {
0.0
},
if out_degrees[neighbor_indices[3]] > 0 {
1.0 / out_degrees[neighbor_indices[3]] as f64
} else {
0.0
},
];
let inv_degrees_simd = f64x4::new(inv_degrees);
let contributions = damping_simd * scores_simd * inv_degrees_simd;
let sum: [f64; 4] = contributions.into();
rank += sum[0] + sum[1] + sum[2] + sum[3];
j += 4;
}
while j < len {
let neighbor_pos = neighbors[j];
let out_degree = out_degrees[neighbor_pos];
if out_degree > 0 {
rank += damping * scores[neighbor_pos] / out_degree as f64;
}
j += 1;
}
rank
})
.collect();
scores = new_scores;
}
node_indices
.into_iter()
.enumerate()
.map(|(i, ni)| (ni, scores[i]))
.collect()
}
#[cfg(feature = "simd")]
pub fn par_degree_centrality_simd<T>(
graph: &Graph<T, impl Clone + Send + Sync>,
) -> HashMap<NodeIndex, f64>
where
T: Clone + Send + Sync,
{
let n = graph.node_count();
if n <= 1 {
return HashMap::new();
}
let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
let norm = 1.0 / (n - 1) as f64;
let centralities: Vec<f64> = node_indices
.par_iter()
.map(|&ni| {
let degree = graph.out_degree(ni).unwrap_or(0) as f64;
degree * norm
})
.collect();
node_indices.into_iter().zip(centralities).collect()
}
pub fn par_bfs<T, F>(graph: &Graph<T, impl Clone + Send + Sync>, start: NodeIndex, visitor: F)
where
T: Clone + Send + Sync,
F: Fn(NodeIndex, usize) -> bool + Send + Sync,
{
let n = graph.node_count();
let visited: Vec<AtomicBool> = (0..n).map(|_| AtomicBool::new(false)).collect();
let visited = Arc::new(visited);
visited[start.index()].store(true, Ordering::SeqCst);
let mut current_layer = vec![start];
let mut depth = 0;
while !current_layer.is_empty() {
let next_layer_vecs: Vec<Vec<NodeIndex>> = current_layer
.par_iter()
.filter_map(|&node| {
if !visitor(node, depth) {
return None;
}
let neighbors: Vec<NodeIndex> = graph
.neighbors(node)
.filter(|&neighbor| {
!visited[neighbor.index()].load(Ordering::Relaxed)
&& visited[neighbor.index()]
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
})
.collect();
if neighbors.is_empty() {
None
} else {
Some(neighbors)
}
})
.collect();
let mut next_layer = Vec::new();
for layer_vec in next_layer_vecs {
next_layer.extend(layer_vec);
}
depth += 1;
current_layer = next_layer;
}
}
pub fn par_connected_components<T>(
graph: &Graph<T, impl Clone + Send + Sync>,
) -> Vec<Vec<NodeIndex>>
where
T: Clone + Send + Sync,
{
let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
let n = node_indices.len();
if n == 0 {
return Vec::new();
}
let parent: Vec<AtomicUsize> = (0..n).map(AtomicUsize::new).collect();
for (i, atomic) in parent.iter().enumerate().take(n) {
atomic.store(i, Ordering::Relaxed);
}
fn find(parent: &[AtomicUsize], mut i: usize) -> usize {
loop {
let p = parent[i].load(Ordering::Relaxed);
if p == i {
return p;
}
i = p;
}
}
fn union_atomic(parent: &[AtomicUsize], i: usize, j: usize) {
let root_i = find(parent, i);
let root_j = find(parent, j);
if root_i == root_j {
return;
}
let (old_root, new_root) = if root_i < root_j {
(root_i, root_j)
} else {
(root_j, root_i)
};
let _ = parent[old_root].compare_exchange(
old_root,
new_root,
Ordering::SeqCst,
Ordering::Relaxed,
);
}
let edges: Vec<(usize, usize)> = graph
.edges()
.flat_map(|edge| {
let source_idx = edge.source().index();
let target_idx = edge.target().index();
vec![(source_idx, target_idx), (target_idx, source_idx)]
})
.collect();
edges.par_iter().for_each(|&(src, tgt)| {
union_atomic(&parent, src, tgt);
});
let mut components_map: HashMap<usize, Vec<NodeIndex>> = HashMap::new();
for &node in &node_indices {
let root = find(&parent, node.index());
components_map.entry(root).or_default().push(node);
}
components_map.into_values().collect()
}
pub fn par_degree_centrality<T>(
graph: &Graph<T, impl Clone + Send + Sync>,
) -> HashMap<NodeIndex, f64>
where
T: Clone + Send + Sync,
{
let n = graph.node_count();
if n <= 1 {
return HashMap::new();
}
let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
let norm = 1.0 / (n - 1) as f64;
node_indices
.par_iter()
.map(|&ni| {
let degree = graph.out_degree(ni).unwrap_or(0) as f64;
(ni, degree * norm)
})
.collect()
}
pub fn par_dijkstra<T, E, F>(
graph: &Graph<T, E>,
source: NodeIndex,
get_weight: F,
delta: f64,
) -> GraphResult<HashMap<NodeIndex, f64>>
where
T: Clone + Send + Sync,
E: Clone + Send + Sync,
F: Fn(NodeIndex, NodeIndex, &E) -> f64 + Send + Sync,
{
let n = graph.node_count();
if n == 0 {
return Ok(HashMap::new());
}
let node_indices: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
let node_to_pos: HashMap<NodeIndex, usize> = node_indices
.iter()
.enumerate()
.map(|(i, &ni)| (ni, i))
.collect();
let distances: Vec<AtomicU64> = (0..n)
.map(|_| AtomicU64::new(f64::INFINITY.to_bits()))
.collect();
if let Some(&source_pos) = node_to_pos.get(&source) {
distances[source_pos].store(0.0_f64.to_bits(), Ordering::Relaxed);
}
let buckets: Vec<SegQueue<usize>> = (0..10000).map(|_| SegQueue::new()).collect();
let buckets = Arc::new(buckets);
if let Some(&source_pos) = node_to_pos.get(&source) {
buckets[0].push(source_pos);
}
let settled: Vec<AtomicBool> = (0..n).map(|_| AtomicBool::new(false)).collect();
let mut current_bucket = 0;
let mut empty_count = 0;
let max_empty_buckets = 100;
loop {
let mut nodes_to_process: Vec<usize> = Vec::new();
while let Some(node_pos) = buckets[current_bucket].pop() {
nodes_to_process.push(node_pos);
}
if nodes_to_process.is_empty() {
empty_count += 1;
if empty_count >= max_empty_buckets {
break;
}
current_bucket += 1;
continue;
}
empty_count = 0;
let nodes_to_process_clone = nodes_to_process.clone();
nodes_to_process.into_par_iter().for_each(|node_pos| {
if settled[node_pos].load(Ordering::Relaxed) {
return;
}
let node = node_indices[node_pos];
let node_dist = {
let dist_bits = distances[node_pos].load(Ordering::Relaxed);
f64::from_bits(dist_bits)
};
for neighbor in graph.neighbors(node) {
if let Some(&neighbor_pos) = node_to_pos.get(&neighbor) {
if settled[neighbor_pos].load(Ordering::Relaxed) {
continue;
}
if let Ok(edge_data) = graph.get_edge_by_nodes(node, neighbor) {
let weight = get_weight(node, neighbor, edge_data);
let new_dist = node_dist + weight;
let mut current_bits = distances[neighbor_pos].load(Ordering::Relaxed);
loop {
let current_dist = f64::from_bits(current_bits);
if new_dist >= current_dist {
break;
}
let new_bits = new_dist.to_bits();
match distances[neighbor_pos].compare_exchange(
current_bits,
new_bits,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
let bucket_idx = ((new_dist / delta).floor() as usize)
.saturating_add(1)
.min(buckets.len() - 1);
buckets[bucket_idx].push(neighbor_pos);
break;
}
Err(observed) => {
current_bits = observed;
}
}
}
}
}
}
});
for node_pos in nodes_to_process_clone {
settled[node_pos].store(true, Ordering::Relaxed);
}
current_bucket += 1;
}
let mut result = HashMap::with_capacity(n);
for (i, &ni) in node_indices.iter().enumerate() {
let dist_bits = distances[i].load(Ordering::Relaxed);
let dist = f64::from_bits(dist_bits);
if dist != f64::INFINITY {
result.insert(ni, dist);
}
}
Ok(result)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builders::GraphBuilder;
#[test]
fn test_par_pagerank() {
let graph = GraphBuilder::directed()
.with_nodes(vec!["A", "B", "C"])
.with_edges(vec![(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0)])
.build()
.unwrap();
let ranks = par_pagerank(&graph, 0.85, 20);
assert_eq!(ranks.len(), 3);
let values: Vec<_> = ranks.values().collect();
for i in 1..values.len() {
assert!((values[i] - values[0]).abs() < 0.01);
}
}
#[test]
fn test_par_connected_components() {
let graph = GraphBuilder::undirected()
.with_nodes(vec![1, 2, 3, 4, 5, 6])
.with_edges(vec![(0, 1, 1.0), (1, 2, 1.0), (3, 4, 1.0)])
.build()
.unwrap();
let components = par_connected_components(&graph);
assert_eq!(components.len(), 3); }
#[test]
fn test_par_degree_centrality() {
let graph = GraphBuilder::directed()
.with_nodes(vec!["A", "B", "C", "D"])
.with_edges(vec![(0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0)])
.build()
.unwrap();
let centrality = par_degree_centrality(&graph);
assert_eq!(centrality.len(), 4);
}
#[test]
fn test_par_dfs() {
use std::sync::atomic::AtomicUsize;
let graph = GraphBuilder::directed()
.with_nodes(vec!["A", "B", "C", "D", "E", "F"])
.with_edges(vec![
(0, 1, 1.0),
(0, 2, 1.0),
(0, 3, 1.0), (1, 4, 1.0), (2, 5, 1.0), ])
.build()
.unwrap();
let start = graph.nodes().next().unwrap().index();
let count = Arc::new(AtomicUsize::new(1)); let count_clone = count.clone();
par_dfs(&graph, start, move |_node| {
count_clone.fetch_add(1, Ordering::SeqCst);
true
});
assert_eq!(count.load(Ordering::SeqCst), 6);
}
#[test]
fn test_par_dijkstra_basic() {
let graph = GraphBuilder::directed()
.with_nodes(vec!["A", "B", "C", "D"])
.with_edges(vec![
(0, 1, 1.0),
(0, 2, 4.0),
(1, 2, 2.0),
(1, 3, 5.0),
(2, 3, 1.0),
])
.build()
.unwrap();
let start = graph.nodes().next().unwrap().index();
let distances = par_dijkstra(&graph, start, |_, _, w| *w, 1.0).unwrap();
assert!(distances.contains_key(&start));
assert_eq!(distances.get(&start), Some(&0.0));
}
#[test]
fn test_par_dijkstra_single_node() {
let graph = GraphBuilder::directed()
.with_nodes(vec![1])
.build()
.unwrap();
let start = graph.nodes().next().unwrap().index();
let distances = par_dijkstra(&graph, start, |_, _, _: &f64| 1.0, 1.0).unwrap();
assert_eq!(distances.len(), 1);
assert_eq!(distances.get(&start), Some(&0.0));
}
#[test]
fn test_par_dijkstra_empty_graph() {
let graph: Graph<i32, f64> = GraphBuilder::directed().build().unwrap();
let distances =
par_dijkstra(&graph, NodeIndex::new(0, 1), |_, _, _: &f64| 1.0, 1.0).unwrap();
assert!(distances.is_empty());
}
}