#[cfg(feature = "parallel")]
use scirs2_core::parallel_ops::*;
use crate::compressed::CsrGraph;
use crate::error::{GraphError, Result};
#[derive(Debug, Clone)]
pub struct BfsResult {
pub distances: Vec<usize>,
pub parents: Vec<usize>,
pub num_visited: usize,
pub num_levels: usize,
}
#[cfg(feature = "parallel")]
pub fn parallel_bfs(graph: &CsrGraph, source: usize) -> Result<BfsResult> {
let n = graph.num_nodes();
if source >= n {
return Err(GraphError::node_not_found_with_context(
source,
n,
"parallel_bfs source",
));
}
let not_visited = usize::MAX;
let mut distances = vec![not_visited; n];
let mut parents = vec![not_visited; n];
distances[source] = 0;
let mut frontier = vec![source];
let mut num_visited = 1;
let mut max_level = 0;
while !frontier.is_empty() {
let next_level = max_level + 1;
let next_frontiers: Vec<Vec<(usize, usize)>> = frontier
.par_iter()
.map(|&node| {
let mut local_discovered = Vec::new();
for (neighbor, _weight) in graph.neighbors(node) {
if distances[neighbor] == not_visited {
local_discovered.push((neighbor, node));
}
}
local_discovered
})
.collect();
let mut next_frontier = Vec::new();
for discovered in next_frontiers {
for (neighbor, parent) in discovered {
if distances[neighbor] == not_visited {
distances[neighbor] = next_level;
parents[neighbor] = parent;
next_frontier.push(neighbor);
num_visited += 1;
}
}
}
if !next_frontier.is_empty() {
max_level = next_level;
}
frontier = next_frontier;
}
Ok(BfsResult {
distances,
parents,
num_visited,
num_levels: max_level,
})
}
pub fn bfs(graph: &CsrGraph, source: usize) -> Result<BfsResult> {
let n = graph.num_nodes();
if source >= n {
return Err(GraphError::node_not_found_with_context(
source,
n,
"bfs source",
));
}
let not_visited = usize::MAX;
let mut distances = vec![not_visited; n];
let mut parents = vec![not_visited; n];
distances[source] = 0;
let mut frontier = vec![source];
let mut num_visited = 1;
let mut max_level = 0;
while !frontier.is_empty() {
let next_level = max_level + 1;
let mut next_frontier = Vec::new();
for &node in &frontier {
for (neighbor, _weight) in graph.neighbors(node) {
if distances[neighbor] == not_visited {
distances[neighbor] = next_level;
parents[neighbor] = node;
next_frontier.push(neighbor);
num_visited += 1;
}
}
}
if !next_frontier.is_empty() {
max_level = next_level;
}
frontier = next_frontier;
}
Ok(BfsResult {
distances,
parents,
num_visited,
num_levels: max_level,
})
}
#[derive(Debug, Clone)]
pub struct ComponentsResult {
pub labels: Vec<usize>,
pub num_components: usize,
pub component_sizes: Vec<usize>,
}
struct UnionFind {
parent: Vec<std::sync::atomic::AtomicUsize>,
}
impl UnionFind {
fn new(n: usize) -> Self {
let parent: Vec<_> = (0..n).map(std::sync::atomic::AtomicUsize::new).collect();
Self { parent }
}
fn find(&self, mut x: usize) -> usize {
loop {
let p = self.parent[x].load(std::sync::atomic::Ordering::Relaxed);
if p == x {
return x;
}
let gp = self.parent[p].load(std::sync::atomic::Ordering::Relaxed);
let _ = self.parent[x].compare_exchange_weak(
p,
gp,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
);
x = gp;
}
}
fn union(&self, x: usize, y: usize) {
loop {
let rx = self.find(x);
let ry = self.find(y);
if rx == ry {
return;
}
let (small, large) = if rx < ry { (rx, ry) } else { (ry, rx) };
match self.parent[large].compare_exchange_weak(
large,
small,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
) {
Ok(_) => return,
Err(_) => continue,
}
}
}
}
#[cfg(feature = "parallel")]
pub fn parallel_connected_components(graph: &CsrGraph) -> ComponentsResult {
let n = graph.num_nodes();
if n == 0 {
return ComponentsResult {
labels: vec![],
num_components: 0,
component_sizes: vec![],
};
}
let uf = UnionFind::new(n);
(0..n).into_par_iter().for_each(|node| {
for (neighbor, _) in graph.neighbors(node) {
uf.union(node, neighbor);
}
});
let labels: Vec<usize> = (0..n).into_par_iter().map(|i| uf.find(i)).collect();
relabel_components(&labels)
}
pub fn connected_components(graph: &CsrGraph) -> ComponentsResult {
let n = graph.num_nodes();
if n == 0 {
return ComponentsResult {
labels: vec![],
num_components: 0,
component_sizes: vec![],
};
}
let not_visited = usize::MAX;
let mut labels = vec![not_visited; n];
let mut component_id = 0;
for start in 0..n {
if labels[start] != not_visited {
continue;
}
let mut queue = vec![start];
labels[start] = component_id;
let mut head = 0;
while head < queue.len() {
let node = queue[head];
head += 1;
for (neighbor, _) in graph.neighbors(node) {
if labels[neighbor] == not_visited {
labels[neighbor] = component_id;
queue.push(neighbor);
}
}
}
component_id += 1;
}
let num_components = component_id;
let mut component_sizes = vec![0usize; num_components];
for &label in &labels {
if label < num_components {
component_sizes[label] += 1;
}
}
ComponentsResult {
labels,
num_components,
component_sizes,
}
}
fn relabel_components(raw_labels: &[usize]) -> ComponentsResult {
let n = raw_labels.len();
let mut root_to_id = std::collections::HashMap::new();
let mut next_id = 0usize;
let mut labels = vec![0usize; n];
for (i, &root) in raw_labels.iter().enumerate() {
let id = root_to_id.entry(root).or_insert_with(|| {
let id = next_id;
next_id += 1;
id
});
labels[i] = *id;
}
let num_components = next_id;
let mut component_sizes = vec![0usize; num_components];
for &label in &labels {
component_sizes[label] += 1;
}
ComponentsResult {
labels,
num_components,
component_sizes,
}
}
#[derive(Debug, Clone)]
pub struct PageRankConfig {
pub damping: f64,
pub max_iterations: usize,
pub tolerance: f64,
}
impl Default for PageRankConfig {
fn default() -> Self {
Self {
damping: 0.85,
max_iterations: 100,
tolerance: 1e-6,
}
}
}
#[derive(Debug, Clone)]
pub struct PageRankResult {
pub scores: Vec<f64>,
pub iterations: usize,
pub residual: f64,
pub converged: bool,
}
#[cfg(feature = "parallel")]
pub fn parallel_pagerank(graph: &CsrGraph, config: &PageRankConfig) -> Result<PageRankResult> {
let n = graph.num_nodes();
if n == 0 {
return Ok(PageRankResult {
scores: vec![],
iterations: 0,
residual: 0.0,
converged: true,
});
}
let d = config.damping;
let inv_n = 1.0 / n as f64;
let out_degree: Vec<usize> = (0..n).into_par_iter().map(|i| graph.degree(i)).collect();
let mut scores = vec![inv_n; n];
let mut new_scores = vec![0.0f64; n];
let mut iterations = 0;
let mut residual = f64::MAX;
while iterations < config.max_iterations && residual > config.tolerance {
iterations += 1;
let dangling_sum: f64 = (0..n)
.into_par_iter()
.filter(|&i| out_degree[i] == 0)
.map(|i| scores[i])
.sum();
let teleport = (1.0 - d) * inv_n + d * dangling_sum * inv_n;
new_scores.iter_mut().for_each(|x| *x = 0.0);
for src in 0..n {
if out_degree[src] == 0 {
continue;
}
let contrib = scores[src] / out_degree[src] as f64;
for (dst, _) in graph.neighbors(src) {
new_scores[dst] += contrib;
}
}
let final_scores: Vec<f64> = new_scores
.par_iter()
.map(|&incoming| teleport + d * incoming)
.collect();
residual = scores
.par_iter()
.zip(final_scores.par_iter())
.map(|(&old, &new)| (old - new).abs())
.sum();
scores = final_scores;
}
Ok(PageRankResult {
scores,
iterations,
residual,
converged: residual <= config.tolerance,
})
}
pub fn pagerank(graph: &CsrGraph, config: &PageRankConfig) -> Result<PageRankResult> {
let n = graph.num_nodes();
if n == 0 {
return Ok(PageRankResult {
scores: vec![],
iterations: 0,
residual: 0.0,
converged: true,
});
}
let d = config.damping;
let inv_n = 1.0 / n as f64;
let out_degree: Vec<usize> = (0..n).map(|i| graph.degree(i)).collect();
let mut scores = vec![inv_n; n];
let mut new_scores = vec![0.0f64; n];
let mut iterations = 0;
let mut residual = f64::MAX;
while iterations < config.max_iterations && residual > config.tolerance {
iterations += 1;
let dangling_sum: f64 = (0..n)
.filter(|&i| out_degree[i] == 0)
.map(|i| scores[i])
.sum();
let teleport = (1.0 - d) * inv_n + d * dangling_sum * inv_n;
new_scores.iter_mut().for_each(|x| *x = 0.0);
for src in 0..n {
if out_degree[src] == 0 {
continue;
}
let contrib = scores[src] / out_degree[src] as f64;
for (dst, _) in graph.neighbors(src) {
new_scores[dst] += contrib;
}
}
residual = 0.0;
for i in 0..n {
let new_val = teleport + d * new_scores[i];
residual += (scores[i] - new_val).abs();
scores[i] = new_val;
}
}
Ok(PageRankResult {
scores,
iterations,
residual,
converged: residual <= config.tolerance,
})
}
#[derive(Debug, Clone)]
pub struct TriangleCountResult {
pub total_triangles: usize,
pub per_node_triangles: Vec<usize>,
}
#[cfg(feature = "parallel")]
pub fn parallel_triangle_count(graph: &CsrGraph) -> TriangleCountResult {
let n = graph.num_nodes();
if n < 3 {
return TriangleCountResult {
total_triangles: 0,
per_node_triangles: vec![0; n],
};
}
let per_node: Vec<usize> = (0..n)
.into_par_iter()
.map(|u| {
let mut count = 0;
let neighbors_u: Vec<usize> = graph.neighbors(u).map(|(v, _)| v).collect();
for &v in &neighbors_u {
if v <= u {
continue;
}
let neighbors_v: Vec<usize> = graph.neighbors(v).map(|(w, _)| w).collect();
let mut iu = 0;
let mut iv = 0;
while iu < neighbors_u.len() && iv < neighbors_v.len() {
let nu = neighbors_u[iu];
let nv = neighbors_v[iv];
if nu <= v {
iu += 1;
} else if nv <= v {
iv += 1;
} else if nu == nv {
count += 1;
iu += 1;
iv += 1;
} else if nu < nv {
iu += 1;
} else {
iv += 1;
}
}
}
count
})
.collect();
let total: usize = per_node.iter().sum();
let mut per_node_triangles = vec![0usize; n];
for u in 0..n {
let neighbors_u: Vec<usize> = graph.neighbors(u).map(|(v, _)| v).collect();
for &v in &neighbors_u {
if v <= u {
continue;
}
let neighbors_v: Vec<usize> = graph.neighbors(v).map(|(w, _)| w).collect();
let mut iu = 0;
let mut iv = 0;
while iu < neighbors_u.len() && iv < neighbors_v.len() {
let nu = neighbors_u[iu];
let nv = neighbors_v[iv];
if nu <= v {
iu += 1;
} else if nv <= v {
iv += 1;
} else if nu == nv {
per_node_triangles[u] += 1;
per_node_triangles[v] += 1;
per_node_triangles[nu] += 1;
iu += 1;
iv += 1;
} else if nu < nv {
iu += 1;
} else {
iv += 1;
}
}
}
}
TriangleCountResult {
total_triangles: total,
per_node_triangles,
}
}
pub fn triangle_count(graph: &CsrGraph) -> TriangleCountResult {
let n = graph.num_nodes();
if n < 3 {
return TriangleCountResult {
total_triangles: 0,
per_node_triangles: vec![0; n],
};
}
let mut total = 0usize;
let mut per_node_triangles = vec![0usize; n];
for u in 0..n {
let neighbors_u: Vec<usize> = graph.neighbors(u).map(|(v, _)| v).collect();
for &v in &neighbors_u {
if v <= u {
continue;
}
let neighbors_v: Vec<usize> = graph.neighbors(v).map(|(w, _)| w).collect();
let mut iu = 0;
let mut iv = 0;
while iu < neighbors_u.len() && iv < neighbors_v.len() {
let nu = neighbors_u[iu];
let nv = neighbors_v[iv];
if nu <= v {
iu += 1;
} else if nv <= v {
iv += 1;
} else if nu == nv {
total += 1;
per_node_triangles[u] += 1;
per_node_triangles[v] += 1;
per_node_triangles[nu] += 1;
iu += 1;
iv += 1;
} else if nu < nv {
iu += 1;
} else {
iv += 1;
}
}
}
}
TriangleCountResult {
total_triangles: total,
per_node_triangles,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_path_graph(n: usize) -> CsrGraph {
let edges: Vec<(usize, usize, f64)> = (0..n - 1).map(|i| (i, i + 1, 1.0)).collect();
CsrGraph::from_edges(n, edges, false).expect("build path")
}
fn make_cycle_graph(n: usize) -> CsrGraph {
let mut edges: Vec<(usize, usize, f64)> = (0..n - 1).map(|i| (i, i + 1, 1.0)).collect();
edges.push((n - 1, 0, 1.0));
CsrGraph::from_edges(n, edges, false).expect("build cycle")
}
fn make_complete_graph(n: usize) -> CsrGraph {
let mut edges = Vec::new();
for i in 0..n {
for j in i + 1..n {
edges.push((i, j, 1.0));
}
}
CsrGraph::from_edges(n, edges, false).expect("build complete")
}
fn make_disconnected_graph() -> CsrGraph {
let edges = vec![(0, 1, 1.0), (1, 2, 1.0), (0, 2, 1.0), (3, 4, 1.0)];
CsrGraph::from_edges(5, edges, false).expect("build disconnected")
}
#[test]
fn test_bfs_path() {
let g = make_path_graph(5);
let result = bfs(&g, 0).expect("bfs");
assert_eq!(result.distances[0], 0);
assert_eq!(result.distances[1], 1);
assert_eq!(result.distances[4], 4);
assert_eq!(result.num_visited, 5);
assert_eq!(result.num_levels, 4);
}
#[test]
fn test_bfs_cycle() {
let g = make_cycle_graph(6);
let result = bfs(&g, 0).expect("bfs");
assert_eq!(result.distances[0], 0);
assert_eq!(result.distances[1], 1);
assert_eq!(result.distances[3], 3); assert_eq!(result.num_visited, 6);
}
#[test]
fn test_bfs_disconnected() {
let g = make_disconnected_graph();
let result = bfs(&g, 0).expect("bfs");
assert_eq!(result.distances[0], 0);
assert_eq!(result.distances[1], 1);
assert_eq!(result.distances[2], 1);
assert_eq!(result.distances[3], usize::MAX); assert_eq!(result.distances[4], usize::MAX);
assert_eq!(result.num_visited, 3);
}
#[test]
fn test_bfs_invalid_source() {
let g = make_path_graph(3);
assert!(bfs(&g, 10).is_err());
}
#[test]
fn test_bfs_single_node() {
let g = CsrGraph::from_edges(1, vec![], false).expect("build");
let result = bfs(&g, 0).expect("bfs");
assert_eq!(result.distances[0], 0);
assert_eq!(result.num_visited, 1);
assert_eq!(result.num_levels, 0);
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_bfs_path() {
let g = make_path_graph(10);
let result = parallel_bfs(&g, 0).expect("parallel bfs");
assert_eq!(result.distances[0], 0);
assert_eq!(result.distances[9], 9);
assert_eq!(result.num_visited, 10);
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_bfs_complete() {
let g = make_complete_graph(5);
let result = parallel_bfs(&g, 0).expect("parallel bfs");
for i in 1..5 {
assert_eq!(result.distances[i], 1);
}
assert_eq!(result.num_visited, 5);
}
#[test]
fn test_cc_connected() {
let g = make_path_graph(5);
let result = connected_components(&g);
assert_eq!(result.num_components, 1);
let label = result.labels[0];
for &l in &result.labels {
assert_eq!(l, label);
}
assert_eq!(result.component_sizes[0], 5);
}
#[test]
fn test_cc_disconnected() {
let g = make_disconnected_graph();
let result = connected_components(&g);
assert_eq!(result.num_components, 2);
assert_eq!(result.labels[0], result.labels[1]);
assert_eq!(result.labels[0], result.labels[2]);
assert_eq!(result.labels[3], result.labels[4]);
assert_ne!(result.labels[0], result.labels[3]);
}
#[test]
fn test_cc_isolated_nodes() {
let g = CsrGraph::from_edges(5, vec![], false).expect("build");
let result = connected_components(&g);
assert_eq!(result.num_components, 5);
for &size in &result.component_sizes {
assert_eq!(size, 1);
}
}
#[test]
fn test_cc_empty() {
let g = CsrGraph::from_edges(0, vec![], false).expect("build");
let result = connected_components(&g);
assert_eq!(result.num_components, 0);
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_cc_disconnected() {
let g = make_disconnected_graph();
let result = parallel_connected_components(&g);
assert_eq!(result.num_components, 2);
assert_eq!(result.labels[0], result.labels[1]);
assert_eq!(result.labels[0], result.labels[2]);
assert_eq!(result.labels[3], result.labels[4]);
assert_ne!(result.labels[0], result.labels[3]);
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_cc_connected() {
let g = make_complete_graph(10);
let result = parallel_connected_components(&g);
assert_eq!(result.num_components, 1);
assert_eq!(result.component_sizes[0], 10);
}
#[test]
fn test_pagerank_simple() {
let g = CsrGraph::from_edges(3, vec![(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0)], true)
.expect("build");
let config = PageRankConfig {
damping: 0.85,
max_iterations: 100,
tolerance: 1e-8,
};
let result = pagerank(&g, &config).expect("pagerank");
let expected = 1.0 / 3.0;
for &score in &result.scores {
assert!(
(score - expected).abs() < 1e-4,
"score {score} != expected {expected}"
);
}
assert!(result.converged);
}
#[test]
fn test_pagerank_star() {
let edges: Vec<(usize, usize, f64)> = (1..5).map(|i| (i, 0, 1.0)).collect();
let g = CsrGraph::from_edges(5, edges, true).expect("build");
let result = pagerank(&g, &PageRankConfig::default()).expect("pagerank");
let max_node = result
.scores
.iter()
.enumerate()
.max_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|(i, _)| i)
.unwrap_or(0);
assert_eq!(max_node, 0);
}
#[test]
fn test_pagerank_empty() {
let g = CsrGraph::from_edges(0, vec![], true).expect("build");
let result = pagerank(&g, &PageRankConfig::default()).expect("pagerank");
assert!(result.scores.is_empty());
assert!(result.converged);
}
#[test]
fn test_pagerank_dangling() {
let g = CsrGraph::from_edges(3, vec![(0, 1, 1.0), (1, 2, 1.0)], true).expect("build");
let result = pagerank(&g, &PageRankConfig::default()).expect("pagerank");
let total: f64 = result.scores.iter().sum();
assert!(
(total - 1.0).abs() < 0.01,
"scores should sum to ~1.0, got {total}"
);
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_pagerank_cycle() {
let g = CsrGraph::from_edges(
4,
vec![(0, 1, 1.0), (1, 2, 1.0), (2, 3, 1.0), (3, 0, 1.0)],
true,
)
.expect("build");
let result = parallel_pagerank(&g, &PageRankConfig::default()).expect("pagerank");
let expected = 0.25;
for &score in &result.scores {
assert!((score - expected).abs() < 1e-4);
}
assert!(result.converged);
}
#[test]
fn test_triangle_count_k3() {
let g = make_complete_graph(3);
let result = triangle_count(&g);
assert_eq!(result.total_triangles, 1);
for &count in &result.per_node_triangles {
assert_eq!(count, 1);
}
}
#[test]
fn test_triangle_count_k4() {
let g = make_complete_graph(4);
let result = triangle_count(&g);
assert_eq!(result.total_triangles, 4); for &count in &result.per_node_triangles {
assert_eq!(count, 3); }
}
#[test]
fn test_triangle_count_k5() {
let g = make_complete_graph(5);
let result = triangle_count(&g);
assert_eq!(result.total_triangles, 10); }
#[test]
fn test_triangle_count_path() {
let g = make_path_graph(5);
let result = triangle_count(&g);
assert_eq!(result.total_triangles, 0); }
#[test]
fn test_triangle_count_single_triangle() {
let g = CsrGraph::from_edges(
4,
vec![(0, 1, 1.0), (1, 2, 1.0), (0, 2, 1.0), (2, 3, 1.0)],
false,
)
.expect("build");
let result = triangle_count(&g);
assert_eq!(result.total_triangles, 1);
assert_eq!(result.per_node_triangles[0], 1);
assert_eq!(result.per_node_triangles[1], 1);
assert_eq!(result.per_node_triangles[2], 1);
assert_eq!(result.per_node_triangles[3], 0); }
#[test]
fn test_triangle_count_empty() {
let g = CsrGraph::from_edges(2, vec![], false).expect("build");
let result = triangle_count(&g);
assert_eq!(result.total_triangles, 0);
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_triangle_count_k4() {
let g = make_complete_graph(4);
let result = parallel_triangle_count(&g);
assert_eq!(result.total_triangles, 4);
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_triangle_count_k5() {
let g = make_complete_graph(5);
let result = parallel_triangle_count(&g);
assert_eq!(result.total_triangles, 10);
}
#[test]
fn test_bfs_parent_tree() {
let g = make_path_graph(5);
let result = bfs(&g, 0).expect("bfs");
assert_eq!(result.parents[0], usize::MAX); for i in 1..5 {
let parent = result.parents[i];
assert!(parent < 5);
assert_eq!(result.distances[i], result.distances[parent] + 1);
}
}
#[test]
fn test_cc_label_consistency() {
let g = make_disconnected_graph();
let result = connected_components(&g);
for node in 0..g.num_nodes() {
for (neighbor, _) in g.neighbors(node) {
assert_eq!(
result.labels[node], result.labels[neighbor],
"nodes {node} and {neighbor} should be in same component"
);
}
}
}
#[test]
fn test_pagerank_scores_sum_to_one() {
let g = make_complete_graph(5);
let config = PageRankConfig {
max_iterations: 200,
tolerance: 1e-10,
..Default::default()
};
let result = pagerank(&g, &config).expect("pagerank");
let total: f64 = result.scores.iter().sum();
assert!(
(total - 1.0).abs() < 0.01,
"PageRank scores should sum to ~1.0, got {total}"
);
}
}