use crate::datatypes::values::Value;
use crate::graph::schema::{DirGraph, InternedKey};
use crate::graph::storage::GraphRead;
use petgraph::algo::kosaraju_scc;
use petgraph::graph::NodeIndex;
use std::collections::{HashMap, HashSet};
use std::time::Instant;
pub fn algorithm_timeout_err() -> String {
"CALL procedure timed out. Pass timeout_ms=N to cypher() to extend, \
or timeout_ms=0 to disable the deadline. Subgraph scoping is not \
yet supported — large graphs may not converge within the default 20s."
.to_string()
}
fn intern_connection_types(connection_types: Option<&[String]>) -> Option<Vec<InternedKey>> {
connection_types.map(|types| types.iter().map(|t| InternedKey::from_str(t)).collect())
}
fn filtered_neighbors_undirected(
graph: &DirGraph,
node: NodeIndex,
connection_types: Option<&[InternedKey]>,
) -> Vec<NodeIndex> {
use petgraph::Direction;
let g = &graph.graph;
let mut neighbors: Vec<NodeIndex> = match connection_types {
None => g.neighbors_undirected(node).collect(),
Some(types) => {
let mut n = Vec::new();
for edge in g.edges_directed(node, Direction::Outgoing) {
if types.iter().any(|t| *t == edge.weight().connection_type) {
n.push(edge.target());
}
}
for edge in g.edges_directed(node, Direction::Incoming) {
if types.iter().any(|t| *t == edge.weight().connection_type) {
n.push(edge.source());
}
}
n
}
};
if neighbors.len() > 1 {
neighbors.sort_unstable();
neighbors.dedup();
}
neighbors
}
fn filtered_neighbors_outgoing(
graph: &DirGraph,
node: NodeIndex,
connection_types: Option<&[InternedKey]>,
) -> Vec<NodeIndex> {
use petgraph::Direction;
let g = &graph.graph;
match connection_types {
None => g.neighbors_directed(node, Direction::Outgoing).collect(),
Some(types) => g
.edges_directed(node, Direction::Outgoing)
.filter(|e| types.iter().any(|t| *t == e.weight().connection_type))
.map(|e| e.target())
.collect(),
}
}
fn node_passes_via_filter(
graph: &DirGraph,
node: NodeIndex,
via_types: &Option<HashSet<&str>>,
) -> bool {
match via_types {
None => true,
Some(types) => {
if let Some(node_data) = graph.graph.node_weight(node) {
types.contains(node_data.node_type_str(&graph.interner))
} else {
false
}
}
}
}
#[derive(Debug, Clone)]
pub struct PathResult {
pub path: Vec<NodeIndex>,
pub cost: usize,
}
#[derive(Debug, Clone)]
pub struct PathNodeInfo {
pub node_type: String,
pub title: String,
pub id: Value,
}
pub fn shortest_path(
graph: &DirGraph,
source: NodeIndex,
target: NodeIndex,
connection_types: Option<&[String]>,
via_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Option<PathResult> {
let via_set: Option<HashSet<&str>> =
via_types.map(|vt| vt.iter().map(|s| s.as_str()).collect());
let interned = intern_connection_types(connection_types);
let path = reconstruct_path_bfs(
graph,
source,
target,
interned.as_deref(),
&via_set,
deadline,
)?;
let cost = path.len().saturating_sub(1);
Some(PathResult { path, cost })
}
pub fn shortest_path_cost(graph: &DirGraph, source: NodeIndex, target: NodeIndex) -> Option<usize> {
if source == target {
return Some(0);
}
let node_bound = graph.graph.node_bound();
let mut visited: Vec<bool> = vec![false; node_bound];
let target_idx = target.index();
let mut current_level: Vec<usize> = vec![source.index()];
let mut next_level: Vec<usize> = Vec::new();
visited[source.index()] = true;
let mut depth: usize = 0;
while !current_level.is_empty() {
depth += 1;
next_level.clear();
for ¤t_idx in ¤t_level {
let current = NodeIndex::new(current_idx);
for neighbor in {
let g = &graph.graph;
g.neighbors_undirected(current)
} {
let neighbor_idx = neighbor.index();
if !visited[neighbor_idx] {
if neighbor_idx == target_idx {
return Some(depth);
}
visited[neighbor_idx] = true;
next_level.push(neighbor_idx);
}
}
}
std::mem::swap(&mut current_level, &mut next_level);
}
None
}
pub fn shortest_path_cost_batch(
graph: &DirGraph,
pairs: &[(NodeIndex, NodeIndex)],
) -> Vec<Option<usize>> {
let node_bound = graph.graph.node_bound();
let nodes: Vec<NodeIndex> = {
let g = &graph.graph;
g.node_indices().collect()
};
let n = nodes.len();
let mut node_to_idx = vec![usize::MAX; node_bound];
for (i, &node) in nodes.iter().enumerate() {
node_to_idx[node.index()] = i;
}
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
for edge in {
let g = &graph.graph;
g.edge_references()
} {
let src_i = node_to_idx[edge.source().index()];
let tgt_i = node_to_idx[edge.target().index()];
if src_i != usize::MAX && tgt_i != usize::MAX {
adj[src_i].push(tgt_i);
adj[tgt_i].push(src_i);
}
}
for neighbors in &mut adj {
neighbors.sort_unstable();
neighbors.dedup();
}
let mut visited: Vec<bool> = vec![false; n];
let mut current_level: Vec<usize> = Vec::new();
let mut next_level: Vec<usize> = Vec::new();
let mut results = Vec::with_capacity(pairs.len());
for &(source, target) in pairs {
if source == target {
results.push(Some(0));
continue;
}
let src_i = node_to_idx[source.index()];
let tgt_i = node_to_idx[target.index()];
if src_i == usize::MAX || tgt_i == usize::MAX {
results.push(None);
continue;
}
let mut touched: Vec<usize> = Vec::new();
current_level.clear();
current_level.push(src_i);
visited[src_i] = true;
touched.push(src_i);
let mut depth: usize = 0;
let mut found = false;
'bfs: while !current_level.is_empty() {
depth += 1;
next_level.clear();
for ¤t_idx in ¤t_level {
for &neighbor_idx in &adj[current_idx] {
if !visited[neighbor_idx] {
if neighbor_idx == tgt_i {
found = true;
break 'bfs;
}
visited[neighbor_idx] = true;
touched.push(neighbor_idx);
next_level.push(neighbor_idx);
}
}
}
std::mem::swap(&mut current_level, &mut next_level);
}
results.push(if found { Some(depth) } else { None });
for &idx in &touched {
visited[idx] = false;
}
}
results
}
fn reconstruct_path_bfs(
graph: &DirGraph,
source: NodeIndex,
target: NodeIndex,
connection_types: Option<&[InternedKey]>,
via_types: &Option<HashSet<&str>>,
deadline: Option<Instant>,
) -> Option<Vec<NodeIndex>> {
use std::collections::{HashMap, VecDeque};
if source == target {
return Some(vec![source]);
}
let mut parent: HashMap<usize, u32> = HashMap::with_capacity(64);
let mut queue: VecDeque<usize> = VecDeque::with_capacity(64);
let source_idx = source.index();
let target_idx = target.index();
parent.insert(source_idx, source_idx as u32);
queue.push_back(source_idx);
let mut visit_count = 0u32;
while let Some(current_idx) = queue.pop_front() {
visit_count += 1;
if visit_count.is_multiple_of(1000) {
if let Some(dl) = deadline {
if Instant::now() > dl {
return None;
}
}
}
let current = NodeIndex::new(current_idx);
let neighbors = filtered_neighbors_undirected(graph, current, connection_types);
for neighbor in neighbors {
let neighbor_idx = neighbor.index();
if parent.contains_key(&neighbor_idx) {
continue;
}
if neighbor_idx != target_idx && !node_passes_via_filter(graph, neighbor, via_types) {
continue;
}
parent.insert(neighbor_idx, current_idx as u32);
if neighbor_idx == target_idx {
let mut path = Vec::with_capacity(16);
let mut node_idx = target_idx;
while node_idx != source_idx {
path.push(NodeIndex::new(node_idx));
node_idx = parent[&node_idx] as usize;
}
path.push(source);
path.reverse();
return Some(path);
}
queue.push_back(neighbor_idx);
}
}
None }
pub fn shortest_path_directed(
graph: &DirGraph,
source: NodeIndex,
target: NodeIndex,
connection_types: Option<&[String]>,
via_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Option<PathResult> {
use std::collections::VecDeque;
if source == target {
return Some(PathResult {
path: vec![source],
cost: 0,
});
}
let via_set: Option<HashSet<&str>> =
via_types.map(|vt| vt.iter().map(|s| s.as_str()).collect());
let interned = intern_connection_types(connection_types);
let node_bound = graph.graph.node_bound();
let mut visited: Vec<bool> = vec![false; node_bound];
let mut parent: Vec<u32> = vec![u32::MAX; node_bound];
let mut queue = VecDeque::with_capacity(node_bound / 4);
let source_idx = source.index();
let target_idx = target.index();
queue.push_back(source_idx);
visited[source_idx] = true;
let mut visit_count = 0u32;
while let Some(current_idx) = queue.pop_front() {
visit_count += 1;
if visit_count.is_multiple_of(1000) {
if let Some(dl) = deadline {
if Instant::now() > dl {
return None;
}
}
}
let current = NodeIndex::new(current_idx);
let neighbors = filtered_neighbors_outgoing(graph, current, interned.as_deref());
for neighbor in neighbors {
let neighbor_idx = neighbor.index();
if !visited[neighbor_idx] {
if neighbor_idx != target_idx && !node_passes_via_filter(graph, neighbor, &via_set)
{
continue;
}
visited[neighbor_idx] = true;
parent[neighbor_idx] = current_idx as u32;
queue.push_back(neighbor_idx);
if neighbor_idx == target_idx {
let mut path = Vec::with_capacity(16);
let mut node_idx = target_idx;
while node_idx != source_idx {
path.push(NodeIndex::new(node_idx));
node_idx = parent[node_idx] as usize;
}
path.push(source);
path.reverse();
let cost = path.len().saturating_sub(1);
return Some(PathResult { path, cost });
}
}
}
}
None
}
#[allow(clippy::too_many_arguments)]
pub fn all_paths(
graph: &DirGraph,
source: NodeIndex,
target: NodeIndex,
max_hops: usize,
max_results: Option<usize>,
connection_types: Option<&[String]>,
via_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Vec<Vec<NodeIndex>> {
let via_set: Option<HashSet<&str>> =
via_types.map(|vt| vt.iter().map(|s| s.as_str()).collect());
let interned = intern_connection_types(connection_types);
let mut results = Vec::new();
let mut current_path = vec![source];
let mut visited = HashSet::new();
visited.insert(source);
find_all_paths_recursive(
graph,
source,
target,
max_hops,
&mut current_path,
&mut visited,
&mut results,
max_results,
interned.as_deref(),
&via_set,
deadline,
);
results
}
#[allow(clippy::only_used_in_recursion, clippy::too_many_arguments)]
fn find_all_paths_recursive(
graph: &DirGraph,
current: NodeIndex,
target: NodeIndex,
remaining_hops: usize,
current_path: &mut Vec<NodeIndex>,
visited: &mut HashSet<NodeIndex>,
results: &mut Vec<Vec<NodeIndex>>,
max_results: Option<usize>,
connection_types: Option<&[InternedKey]>,
via_types: &Option<HashSet<&str>>,
deadline: Option<Instant>,
) {
if let Some(max) = max_results {
if results.len() >= max {
return;
}
}
if let Some(dl) = deadline {
if Instant::now() > dl {
return;
}
}
if current == target {
results.push(current_path.clone());
return;
}
if remaining_hops == 0 {
return;
}
let neighbors = filtered_neighbors_undirected(graph, current, connection_types);
for neighbor in neighbors {
if let Some(max) = max_results {
if results.len() >= max {
return;
}
}
if !visited.contains(&neighbor) {
if neighbor != target && !node_passes_via_filter(graph, neighbor, via_types) {
continue;
}
visited.insert(neighbor);
current_path.push(neighbor);
find_all_paths_recursive(
graph,
neighbor,
target,
remaining_hops - 1,
current_path,
visited,
results,
max_results,
connection_types,
via_types,
deadline,
);
current_path.pop();
visited.remove(&neighbor);
}
}
}
pub fn connected_components(graph: &DirGraph) -> Vec<Vec<NodeIndex>> {
if GraphRead::is_disk(&graph.graph) {
return weakly_connected_components(graph, None)
.expect("weakly_connected_components with deadline=None cannot time out");
}
kosaraju_scc(graph.graph.as_stable_digraph())
}
pub fn weakly_connected_components(
graph: &DirGraph,
deadline: Option<Instant>,
) -> Result<Vec<Vec<NodeIndex>>, String> {
let nodes: Vec<NodeIndex> = {
let g = &graph.graph;
g.node_indices().collect()
};
let n = nodes.len();
if n == 0 {
return Ok(Vec::new());
}
let bound = graph.graph.node_bound();
let mut node_to_idx = vec![0usize; bound];
for (i, &node) in nodes.iter().enumerate() {
node_to_idx[node.index()] = i;
}
let mut parent: Vec<usize> = (0..n).collect();
let mut rank: Vec<u8> = vec![0; n];
#[inline]
fn find(parent: &mut [usize], mut x: usize) -> usize {
while parent[x] != x {
parent[x] = parent[parent[x]]; x = parent[x];
}
x
}
#[inline]
fn union(parent: &mut [usize], rank: &mut [u8], a: usize, b: usize) {
let ra = find(parent, a);
let rb = find(parent, b);
if ra == rb {
return;
}
if rank[ra] < rank[rb] {
parent[ra] = rb;
} else if rank[ra] > rank[rb] {
parent[rb] = ra;
} else {
parent[rb] = ra;
rank[ra] += 1;
}
}
let mut edge_counter: usize = 0;
for edge in {
let g = &graph.graph;
g.edge_references()
} {
edge_counter += 1;
if edge_counter & 0xFFFFF == 0 {
if let Some(dl) = deadline {
if Instant::now() > dl {
return Err(algorithm_timeout_err());
}
}
}
let src_i = node_to_idx[edge.source().index()];
let tgt_i = node_to_idx[edge.target().index()];
union(&mut parent, &mut rank, src_i, tgt_i);
}
let mut component_map: HashMap<usize, Vec<NodeIndex>> = HashMap::new();
for (i, &node) in nodes.iter().enumerate() {
let root = find(&mut parent, i);
component_map.entry(root).or_default().push(node);
}
let mut components: Vec<Vec<NodeIndex>> = component_map.into_values().collect();
components.sort_by_key(|b| std::cmp::Reverse(b.len()));
Ok(components)
}
pub fn get_node_info(graph: &DirGraph, node_idx: NodeIndex) -> Option<PathNodeInfo> {
let node = graph.get_node(node_idx)?;
let node_title = node.title();
let title_str = match &*node_title {
Value::String(s) => s.clone(),
_ => format!("{:?}", &*node_title),
};
Some(PathNodeInfo {
node_type: node.node_type_str(&graph.interner).to_string(),
title: title_str,
id: node.id().into_owned(),
})
}
pub fn get_path_connections(graph: &DirGraph, path: &[NodeIndex]) -> Vec<Option<String>> {
let mut connections = Vec::with_capacity(path.len().saturating_sub(1));
for window in path.windows(2) {
let from = window[0];
let to = window[1];
let conn_type = graph
.graph
.edges(from)
.find(|e| e.target() == to)
.map(|e| e.weight().connection_type_str(&graph.interner).to_string())
.or_else(|| {
graph
.graph
.edges(to)
.find(|e| e.target() == from)
.map(|e| e.weight().connection_type_str(&graph.interner).to_string())
});
connections.push(conn_type);
}
connections
}
pub fn are_connected(graph: &DirGraph, source: NodeIndex, target: NodeIndex) -> bool {
shortest_path(graph, source, target, None, None, None).is_some()
}
pub fn node_degree(graph: &DirGraph, node: NodeIndex) -> usize {
let g = &graph.graph;
g.edges(node).count()
+ g.neighbors_directed(node, petgraph::Direction::Incoming)
.count()
}
#[derive(Debug, Clone)]
pub struct CentralityResult {
pub node_idx: NodeIndex,
pub score: f64,
}
pub fn betweenness_centrality(
graph: &DirGraph,
normalized: bool,
sample_size: Option<usize>,
connection_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Result<Vec<CentralityResult>, String> {
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
let nodes: Vec<NodeIndex> = {
let g = &graph.graph;
g.node_indices().collect()
};
let n = nodes.len();
if n <= 2 {
return Ok(nodes
.iter()
.map(|&idx| CentralityResult {
node_idx: idx,
score: 0.0,
})
.collect());
}
let bound = graph.graph.node_bound();
let mut node_to_idx = vec![0usize; bound];
for (i, &node) in nodes.iter().enumerate() {
node_to_idx[node.index()] = i;
}
let interned_ct = intern_connection_types(connection_types);
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
for edge in {
let g = &graph.graph;
g.edge_references()
} {
if let Some(ref types) = interned_ct {
if !types.iter().any(|t| *t == edge.weight().connection_type) {
continue;
}
}
let src_i = node_to_idx[edge.source().index()];
let tgt_i = node_to_idx[edge.target().index()];
adj[src_i].push(tgt_i);
adj[tgt_i].push(src_i);
}
for neighbors in &mut adj {
neighbors.sort_unstable();
neighbors.dedup();
}
let source_indices: Vec<usize> = if let Some(k) = sample_size {
let k = k.min(n);
if k == n {
(0..n).collect()
} else {
let step = n as f64 / k as f64;
(0..k).map(|i| (i as f64 * step) as usize).collect()
}
} else {
(0..n).collect()
};
let use_parallel = n >= 4096;
let timed_out = AtomicBool::new(false);
let mut betweenness: Vec<f64> = if use_parallel {
use rayon::prelude::*;
let adj_ref = &adj;
let deadline_ref = &deadline;
let timed_out_ref = &timed_out;
let num_threads = rayon::current_num_threads();
let chunk_size = (source_indices.len() / num_threads).max(1);
source_indices
.par_chunks(chunk_size)
.map(|chunk| {
let mut local_betweenness: Vec<f64> = vec![0.0; n];
let mut stack: Vec<usize> = Vec::with_capacity(n);
let mut pred: Vec<Vec<usize>> = vec![Vec::new(); n];
let mut sigma: Vec<f64> = vec![0.0; n];
let mut dist: Vec<i64> = vec![-1; n];
let mut delta: Vec<f64> = vec![0.0; n];
let mut queue: VecDeque<usize> = VecDeque::with_capacity(n);
for (local_counter, &s_idx) in chunk.iter().enumerate() {
if local_counter % 10 == 0 {
if timed_out_ref.load(Ordering::Relaxed) {
break;
}
if let Some(dl) = deadline_ref {
if Instant::now() > *dl {
timed_out_ref.store(true, Ordering::Relaxed);
break;
}
}
}
stack.clear();
queue.clear();
sigma[s_idx] = 1.0;
dist[s_idx] = 0;
queue.push_back(s_idx);
while let Some(v_idx) = queue.pop_front() {
stack.push(v_idx);
let v_dist = dist[v_idx];
for &w_idx in &adj_ref[v_idx] {
let d = dist[w_idx];
if d < 0 {
dist[w_idx] = v_dist + 1;
queue.push_back(w_idx);
sigma[w_idx] += sigma[v_idx];
pred[w_idx].push(v_idx);
} else if d == v_dist + 1 {
sigma[w_idx] += sigma[v_idx];
pred[w_idx].push(v_idx);
}
}
}
while let Some(w_idx) = stack.pop() {
for &v_idx in &pred[w_idx] {
let contribution = (sigma[v_idx] / sigma[w_idx]) * (1.0 + delta[w_idx]);
delta[v_idx] += contribution;
}
if w_idx != s_idx {
local_betweenness[w_idx] += delta[w_idx];
}
pred[w_idx].clear();
sigma[w_idx] = 0.0;
dist[w_idx] = -1;
delta[w_idx] = 0.0;
}
}
local_betweenness
})
.reduce(
|| vec![0.0; n],
|mut a, b| {
for i in 0..n {
a[i] += b[i];
}
a
},
)
} else {
let mut betweenness: Vec<f64> = vec![0.0; n];
let mut stack: Vec<usize> = Vec::with_capacity(n);
let mut pred: Vec<Vec<usize>> = vec![Vec::new(); n];
let mut sigma: Vec<f64> = vec![0.0; n];
let mut dist: Vec<i64> = vec![-1; n];
let mut delta: Vec<f64> = vec![0.0; n];
let mut queue: VecDeque<usize> = VecDeque::with_capacity(n);
for (source_counter, &s_idx) in source_indices.iter().enumerate() {
if source_counter.is_multiple_of(10) {
if let Some(dl) = deadline {
if Instant::now() > dl {
return Err(algorithm_timeout_err());
}
}
}
stack.clear();
queue.clear();
sigma[s_idx] = 1.0;
dist[s_idx] = 0;
queue.push_back(s_idx);
while let Some(v_idx) = queue.pop_front() {
stack.push(v_idx);
let v_dist = dist[v_idx];
for &w_idx in &adj[v_idx] {
let d = dist[w_idx];
if d < 0 {
dist[w_idx] = v_dist + 1;
queue.push_back(w_idx);
sigma[w_idx] += sigma[v_idx];
pred[w_idx].push(v_idx);
} else if d == v_dist + 1 {
sigma[w_idx] += sigma[v_idx];
pred[w_idx].push(v_idx);
}
}
}
while let Some(w_idx) = stack.pop() {
for &v_idx in &pred[w_idx] {
let contribution = (sigma[v_idx] / sigma[w_idx]) * (1.0 + delta[w_idx]);
delta[v_idx] += contribution;
}
if w_idx != s_idx {
betweenness[w_idx] += delta[w_idx];
}
pred[w_idx].clear();
sigma[w_idx] = 0.0;
dist[w_idx] = -1;
delta[w_idx] = 0.0;
}
}
betweenness
};
if timed_out.load(Ordering::Relaxed) {
return Err(algorithm_timeout_err());
}
for score in betweenness.iter_mut() {
*score /= 2.0;
}
if normalized && n > 2 {
let scale = 2.0 / ((n - 1) as f64 * (n - 2) as f64);
for score in betweenness.iter_mut() {
*score *= scale;
}
}
if let Some(k) = sample_size {
if k < n {
let scale = n as f64 / k as f64;
for score in betweenness.iter_mut() {
*score *= scale;
}
}
}
let mut results: Vec<CentralityResult> = nodes
.iter()
.enumerate()
.map(|(i, &node_idx)| CentralityResult {
node_idx,
score: betweenness[i],
})
.collect();
results.sort_unstable_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
Ok(results)
}
pub fn pagerank(
graph: &DirGraph,
damping_factor: f64,
max_iterations: usize,
tolerance: f64,
connection_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Result<Vec<CentralityResult>, String> {
let nodes: Vec<NodeIndex> = {
let g = &graph.graph;
g.node_indices().collect()
};
let n = nodes.len();
if n == 0 {
return Ok(Vec::new());
}
let bound = graph.graph.node_bound();
let mut node_to_idx = vec![0usize; bound];
for (i, &node) in nodes.iter().enumerate() {
node_to_idx[node.index()] = i;
}
let interned_ct = intern_connection_types(connection_types);
let mut in_adj: Vec<Vec<usize>> = vec![Vec::new(); n];
let mut out_degrees: Vec<usize> = vec![0; n];
for edge in {
let g = &graph.graph;
g.edge_references()
} {
if let Some(ref types) = interned_ct {
if !types.iter().any(|t| *t == edge.weight().connection_type) {
continue;
}
}
let src_i = node_to_idx[edge.source().index()];
let tgt_i = node_to_idx[edge.target().index()];
in_adj[tgt_i].push(src_i);
out_degrees[src_i] += 1;
}
let mut pr: Vec<f64> = vec![1.0 / n as f64; n];
let mut new_pr: Vec<f64> = vec![0.0; n];
let inv_out_degrees: Vec<f64> = out_degrees
.iter()
.map(|&d| {
if d > 0 {
damping_factor / d as f64
} else {
0.0
}
})
.collect();
let is_dangling: Vec<bool> = out_degrees.iter().map(|&d| d == 0).collect();
let teleport = (1.0 - damping_factor) / n as f64;
let inv_n = 1.0 / n as f64;
let use_parallel = n >= 4096;
for _iteration in 0..max_iterations {
if let Some(dl) = deadline {
if Instant::now() > dl {
return Err(algorithm_timeout_err());
}
}
let dangling_sum: f64 = if use_parallel {
use rayon::prelude::*;
(0..n)
.into_par_iter()
.filter(|&i| is_dangling[i])
.map(|i| pr[i])
.sum()
} else {
(0..n).filter(|&i| is_dangling[i]).map(|i| pr[i]).sum()
};
let base_score = teleport + damping_factor * dangling_sum * inv_n;
if use_parallel {
use rayon::prelude::*;
new_pr.par_iter_mut().enumerate().for_each(|(j, score)| {
let mut s = base_score;
for &src in &in_adj[j] {
s += inv_out_degrees[src] * pr[src];
}
*score = s;
});
} else {
for j in 0..n {
let mut s = base_score;
for &src in &in_adj[j] {
s += inv_out_degrees[src] * pr[src];
}
new_pr[j] = s;
}
}
let diff: f64 = if use_parallel {
use rayon::prelude::*;
pr.par_iter()
.zip(new_pr.par_iter())
.map(|(a, b)| (a - b).abs())
.sum()
} else {
pr.iter()
.zip(new_pr.iter())
.map(|(a, b)| (a - b).abs())
.sum()
};
std::mem::swap(&mut pr, &mut new_pr);
if diff < tolerance {
break;
}
}
let mut results: Vec<CentralityResult> = nodes
.iter()
.enumerate()
.map(|(i, &node_idx)| CentralityResult {
node_idx,
score: pr[i],
})
.collect();
results.sort_unstable_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
Ok(results)
}
pub fn degree_centrality(
graph: &DirGraph,
normalized: bool,
connection_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Result<Vec<CentralityResult>, String> {
let nodes: Vec<NodeIndex> = {
let g = &graph.graph;
g.node_indices().collect()
};
let n = nodes.len();
if n == 0 {
return Ok(Vec::new());
}
let scale = if normalized && n > 1 {
1.0 / (n - 1) as f64
} else {
1.0
};
let interned_ct = intern_connection_types(connection_types);
let bound = graph.graph.node_bound();
let mut degrees = vec![0usize; bound];
let mut edge_counter: usize = 0;
for edge in {
let g = &graph.graph;
g.edge_references()
} {
edge_counter += 1;
if edge_counter & 0xFFFFF == 0 {
if let Some(dl) = deadline {
if Instant::now() > dl {
return Err(algorithm_timeout_err());
}
}
}
if let Some(ref types) = interned_ct {
if !types.iter().any(|t| *t == edge.weight().connection_type) {
continue;
}
}
degrees[edge.source().index()] += 1; degrees[edge.target().index()] += 1; }
let mut results: Vec<CentralityResult> = nodes
.iter()
.map(|&node_idx| CentralityResult {
node_idx,
score: degrees[node_idx.index()] as f64 * scale,
})
.collect();
results.sort_unstable_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
Ok(results)
}
pub fn closeness_centrality(
graph: &DirGraph,
normalized: bool,
sample_size: Option<usize>,
connection_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Result<Vec<CentralityResult>, String> {
use std::sync::atomic::{AtomicBool, Ordering};
let nodes: Vec<NodeIndex> = {
let g = &graph.graph;
g.node_indices().collect()
};
let n = nodes.len();
if n == 0 {
return Ok(Vec::new());
}
let bound = graph.graph.node_bound();
let mut node_to_idx = vec![0usize; bound];
for (i, &node) in nodes.iter().enumerate() {
node_to_idx[node.index()] = i;
}
let interned_ct = intern_connection_types(connection_types);
let mut adj_incoming: Vec<Vec<usize>> = vec![Vec::new(); n];
for edge in {
let g = &graph.graph;
g.edge_references()
} {
if let Some(ref types) = interned_ct {
if !types.iter().any(|t| *t == edge.weight().connection_type) {
continue;
}
}
let src_i = node_to_idx[edge.source().index()];
let tgt_i = node_to_idx[edge.target().index()];
adj_incoming[tgt_i].push(src_i);
}
for neighbors in &mut adj_incoming {
neighbors.sort_unstable();
neighbors.dedup();
}
let source_indices: Vec<usize> = if let Some(k) = sample_size {
let k = k.min(n);
if k == n {
(0..n).collect()
} else {
let step = n as f64 / k as f64;
(0..k).map(|i| (i as f64 * step) as usize).collect()
}
} else {
(0..n).collect()
};
let use_parallel = source_indices.len() >= 4096;
let timed_out = AtomicBool::new(false);
if use_parallel {
use rayon::prelude::*;
let adj_ref = &adj_incoming;
let deadline_ref = &deadline;
let nodes_ref = &nodes;
let timed_out_ref = &timed_out;
let mut results: Vec<CentralityResult> = source_indices
.par_iter()
.enumerate()
.map(|(i, &s_idx)| {
let source = nodes_ref[s_idx];
if i % 100 == 0 {
if timed_out_ref.load(Ordering::Relaxed) {
return CentralityResult {
node_idx: source,
score: 0.0,
};
}
if let Some(dl) = deadline_ref {
if Instant::now() > *dl {
timed_out_ref.store(true, Ordering::Relaxed);
return CentralityResult {
node_idx: source,
score: 0.0,
};
}
}
}
let mut dist: Vec<i64> = vec![-1; n];
let mut current_level: Vec<usize> = Vec::with_capacity(n / 4);
let mut next_level: Vec<usize> = Vec::with_capacity(n / 4);
let mut touched: Vec<usize> = Vec::with_capacity(n / 4);
current_level.push(s_idx);
dist[s_idx] = 0;
touched.push(s_idx);
let mut depth: i64 = 0;
while !current_level.is_empty() {
depth += 1;
next_level.clear();
for ¤t_idx in ¤t_level {
for &neighbor_idx in &adj_ref[current_idx] {
if dist[neighbor_idx] < 0 {
dist[neighbor_idx] = depth;
next_level.push(neighbor_idx);
touched.push(neighbor_idx);
}
}
}
std::mem::swap(&mut current_level, &mut next_level);
}
let reachable = touched.len();
let total_distance: i64 = touched.iter().map(|&idx| dist[idx]).sum();
if reachable > 1 && total_distance > 0 {
let closeness = (reachable - 1) as f64 / total_distance as f64;
let score = if normalized {
closeness * (reachable - 1) as f64 / (n - 1) as f64
} else {
closeness
};
CentralityResult {
node_idx: source,
score,
}
} else {
CentralityResult {
node_idx: source,
score: 0.0,
}
}
})
.collect();
if timed_out.load(Ordering::Relaxed) {
return Err(algorithm_timeout_err());
}
results.sort_unstable_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
return Ok(results);
}
let mut results = Vec::with_capacity(source_indices.len());
let mut dist: Vec<i64> = vec![-1; n];
let mut current_level: Vec<usize> = Vec::with_capacity(n);
let mut next_level: Vec<usize> = Vec::with_capacity(n);
let mut touched: Vec<usize> = Vec::with_capacity(n);
for (i, &s_idx) in source_indices.iter().enumerate() {
let source = nodes[s_idx];
if i.is_multiple_of(10) {
if let Some(dl) = deadline {
if Instant::now() > dl {
return Err(algorithm_timeout_err());
}
}
}
for &idx in &touched {
dist[idx] = -1;
}
touched.clear();
current_level.clear();
current_level.push(s_idx);
dist[s_idx] = 0;
touched.push(s_idx);
let mut depth: i64 = 0;
while !current_level.is_empty() {
depth += 1;
next_level.clear();
for ¤t_idx in ¤t_level {
for &neighbor_idx in &adj_incoming[current_idx] {
if dist[neighbor_idx] < 0 {
dist[neighbor_idx] = depth;
next_level.push(neighbor_idx);
touched.push(neighbor_idx);
}
}
}
std::mem::swap(&mut current_level, &mut next_level);
}
let reachable = touched.len();
let total_distance: i64 = touched.iter().map(|&idx| dist[idx]).sum();
if reachable > 1 && total_distance > 0 {
let closeness = (reachable - 1) as f64 / total_distance as f64;
let score = if normalized {
closeness * (reachable - 1) as f64 / (n - 1) as f64
} else {
closeness
};
results.push(CentralityResult {
node_idx: source,
score,
});
} else {
results.push(CentralityResult {
node_idx: source,
score: 0.0,
});
}
}
results.sort_unstable_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
Ok(results)
}
#[derive(Debug, Clone)]
pub struct CommunityAssignment {
pub node_idx: NodeIndex,
pub community_id: usize,
}
#[derive(Debug)]
pub struct CommunityResult {
pub assignments: Vec<CommunityAssignment>,
pub num_communities: usize,
pub modularity: f64,
}
pub fn louvain_communities(
graph: &DirGraph,
weight_property: Option<&str>,
resolution: f64,
connection_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Result<CommunityResult, String> {
let nodes: Vec<NodeIndex> = {
let g = &graph.graph;
g.node_indices().collect()
};
let n = nodes.len();
if n == 0 {
return Ok(CommunityResult {
assignments: Vec::new(),
num_communities: 0,
modularity: 0.0,
});
}
let bound = graph.graph.node_bound();
let mut node_to_idx = vec![0usize; bound];
for (i, &node) in nodes.iter().enumerate() {
node_to_idx[node.index()] = i;
}
let interned_ct = intern_connection_types(connection_types);
let mut adj: Vec<Vec<(usize, f64)>> = vec![Vec::new(); n];
let mut total_weight = 0.0f64;
for edge in {
let g = &graph.graph;
g.edge_references()
} {
if let Some(ref types) = interned_ct {
if !types.iter().any(|t| *t == edge.weight().connection_type) {
continue;
}
}
let w = edge_weight(graph, edge.id(), weight_property);
let src_i = node_to_idx[edge.source().index()];
let tgt_i = node_to_idx[edge.target().index()];
adj[src_i].push((tgt_i, w));
adj[tgt_i].push((src_i, w));
total_weight += w;
}
for neighbors in &mut adj {
neighbors.sort_unstable_by_key(|&(idx, _)| idx);
neighbors.dedup_by(|a, b| {
if a.0 == b.0 {
b.1 += a.1;
true
} else {
false
}
});
}
if total_weight == 0.0 {
let assignments: Vec<CommunityAssignment> = nodes
.iter()
.enumerate()
.map(|(i, &idx)| CommunityAssignment {
node_idx: idx,
community_id: i,
})
.collect();
return Ok(CommunityResult {
assignments,
num_communities: n,
modularity: 0.0,
});
}
let mut community: Vec<usize> = (0..n).collect();
let mut degree: Vec<f64> = vec![0.0; n];
for i in 0..n {
for &(_, w) in &adj[i] {
degree[i] += w;
}
}
let mut sigma_tot: Vec<f64> = vec![0.0; n];
sigma_tot[..n].copy_from_slice(°ree[..n]);
let m = total_weight;
let two_m = 2.0 * m;
let inv_m = 1.0 / m;
let resolution_over_two_m_sq = resolution / (two_m * two_m);
let mut comm_weight: Vec<f64> = vec![0.0; n];
let mut touched_comms: Vec<usize> = Vec::with_capacity(64);
let max_iterations = 100;
for _ in 0..max_iterations {
if let Some(dl) = deadline {
if Instant::now() > dl {
return Err(algorithm_timeout_err());
}
}
let mut improved = false;
for i in 0..n {
let current_community = community[i];
let k_i = degree[i];
let k_i_res = k_i * resolution_over_two_m_sq;
touched_comms.clear();
for &(neighbor, w) in &adj[i] {
let c = community[neighbor];
if comm_weight[c] == 0.0 {
touched_comms.push(c);
}
comm_weight[c] += w;
}
let k_i_in_current = comm_weight[current_community];
let mut best_community = current_community;
let mut best_delta = 0.0f64;
for &cand_community in &touched_comms {
if cand_community == current_community {
continue;
}
let k_i_in_cand = comm_weight[cand_community];
let sigma_cand = sigma_tot[cand_community];
let sigma_curr = sigma_tot[current_community] - k_i;
let gain_add = k_i_in_cand * inv_m - sigma_cand * k_i_res;
let loss_remove = k_i_in_current * inv_m - sigma_curr * k_i_res;
let delta = gain_add - loss_remove;
if delta > best_delta {
best_delta = delta;
best_community = cand_community;
}
}
for &c in &touched_comms {
comm_weight[c] = 0.0;
}
if best_community != current_community {
sigma_tot[current_community] -= k_i;
sigma_tot[best_community] += k_i;
community[i] = best_community;
improved = true;
}
}
if !improved {
break;
}
}
let mut community_bound: Vec<usize> = vec![0; bound];
let mut node_exists: Vec<bool> = vec![false; bound];
for (i, &node) in nodes.iter().enumerate() {
community_bound[node.index()] = community[i];
node_exists[node.index()] = true;
}
let mut id_map: HashMap<usize, usize> = HashMap::new();
for &c in &community {
let next_id = id_map.len();
id_map.entry(c).or_insert(next_id);
}
let assignments: Vec<CommunityAssignment> = nodes
.iter()
.enumerate()
.map(|(i, &idx)| CommunityAssignment {
node_idx: idx,
community_id: *id_map.get(&community[i]).unwrap(),
})
.collect();
let num_communities = id_map.len();
let modularity = compute_modularity(
graph,
&community_bound,
&node_exists,
total_weight,
weight_property,
);
Ok(CommunityResult {
assignments,
num_communities,
modularity,
})
}
pub fn label_propagation(
graph: &DirGraph,
max_iterations: usize,
connection_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Result<CommunityResult, String> {
let nodes: Vec<NodeIndex> = {
let g = &graph.graph;
g.node_indices().collect()
};
let n = nodes.len();
if n == 0 {
return Ok(CommunityResult {
assignments: Vec::new(),
num_communities: 0,
modularity: 0.0,
});
}
let bound = graph.graph.node_bound();
let mut node_to_idx = vec![0usize; bound];
for (i, &node) in nodes.iter().enumerate() {
node_to_idx[node.index()] = i;
}
let interned_ct = intern_connection_types(connection_types);
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
for edge in {
let g = &graph.graph;
g.edge_references()
} {
if let Some(ref types) = interned_ct {
if !types.iter().any(|t| *t == edge.weight().connection_type) {
continue;
}
}
let src_i = node_to_idx[edge.source().index()];
let tgt_i = node_to_idx[edge.target().index()];
adj[src_i].push(tgt_i);
adj[tgt_i].push(src_i);
}
for neighbors in &mut adj {
neighbors.sort_unstable();
neighbors.dedup();
}
let mut labels: Vec<usize> = (0..n).collect();
let mut label_count: Vec<usize> = vec![0; n];
let mut touched_labels: Vec<usize> = Vec::with_capacity(64);
for _ in 0..max_iterations {
if let Some(dl) = deadline {
if Instant::now() > dl {
return Err(algorithm_timeout_err());
}
}
let mut changed = false;
for i in 0..n {
let neighbors = &adj[i];
if neighbors.is_empty() {
continue; }
touched_labels.clear();
for &neighbor in neighbors {
let lbl = labels[neighbor];
if label_count[lbl] == 0 {
touched_labels.push(lbl);
}
label_count[lbl] += 1;
}
let mut best_label = labels[i];
let mut best_count = 0;
for &lbl in &touched_labels {
if label_count[lbl] > best_count {
best_count = label_count[lbl];
best_label = lbl;
}
}
for &lbl in &touched_labels {
label_count[lbl] = 0;
}
if best_label != labels[i] {
labels[i] = best_label;
changed = true;
}
}
if !changed {
break;
}
}
let mut labels_bound: Vec<usize> = vec![0; bound];
let mut node_exists: Vec<bool> = vec![false; bound];
for (i, &node) in nodes.iter().enumerate() {
labels_bound[node.index()] = labels[i];
node_exists[node.index()] = true;
}
let mut id_map: HashMap<usize, usize> = HashMap::new();
for &lbl in &labels {
let next_id = id_map.len();
id_map.entry(lbl).or_insert(next_id);
}
let assignments: Vec<CommunityAssignment> = nodes
.iter()
.enumerate()
.map(|(i, &idx)| CommunityAssignment {
node_idx: idx,
community_id: *id_map.get(&labels[i]).unwrap(),
})
.collect();
let total_weight = graph.graph.edge_count() as f64;
let num_communities = id_map.len();
let modularity = compute_modularity(graph, &labels_bound, &node_exists, total_weight, None);
Ok(CommunityResult {
assignments,
num_communities,
modularity,
})
}
pub(crate) fn edge_weight(
graph: &DirGraph,
edge_id: petgraph::graph::EdgeIndex,
weight_property: Option<&str>,
) -> f64 {
if let Some(prop) = weight_property {
let g = &graph.graph;
if let Some(edge_data) = g.edge_weight(edge_id) {
if let Some(val) = edge_data.get_property(prop) {
return crate::graph::core::value_operations::value_to_f64(val).unwrap_or(1.0);
}
}
}
1.0
}
#[derive(Debug, Clone)]
pub struct WeightedPathResult {
pub path: Vec<NodeIndex>,
pub weight: f64,
}
pub fn shortest_path_weighted(
graph: &DirGraph,
source: NodeIndex,
target: NodeIndex,
weight_property: &str,
connection_types: Option<&[String]>,
via_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Option<WeightedPathResult> {
use std::cmp::Ordering;
use std::collections::BinaryHeap;
if source == target {
return Some(WeightedPathResult {
path: vec![source],
weight: 0.0,
});
}
let via_set: Option<HashSet<&str>> =
via_types.map(|vt| vt.iter().map(|s| s.as_str()).collect());
let interned = intern_connection_types(connection_types);
let conn_filter = interned.as_deref();
let node_bound = graph.graph.node_bound();
let mut dist: Vec<f64> = vec![f64::INFINITY; node_bound];
let mut parent: Vec<u32> = vec![u32::MAX; node_bound];
dist[source.index()] = 0.0;
#[derive(PartialEq)]
struct State(f64, usize);
impl Eq for State {}
impl Ord for State {
fn cmp(&self, other: &Self) -> Ordering {
other
.0
.partial_cmp(&self.0)
.unwrap_or(Ordering::Equal)
.then_with(|| self.1.cmp(&other.1))
}
}
impl PartialOrd for State {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
let mut heap: BinaryHeap<State> = BinaryHeap::new();
heap.push(State(0.0, source.index()));
let mut visit_count = 0u32;
while let Some(State(d, current_idx)) = heap.pop() {
if d > dist[current_idx] {
continue;
}
if current_idx == target.index() {
let mut path = Vec::with_capacity(16);
let mut idx = current_idx;
while idx != source.index() {
path.push(NodeIndex::new(idx));
idx = parent[idx] as usize;
}
path.push(source);
path.reverse();
return Some(WeightedPathResult { path, weight: d });
}
visit_count += 1;
if visit_count.is_multiple_of(1000) {
if let Some(dl) = deadline {
if Instant::now() > dl {
return None;
}
}
}
let current = NodeIndex::new(current_idx);
for edge in graph
.graph
.edges_directed(current, petgraph::Direction::Outgoing)
.chain(
graph
.graph
.edges_directed(current, petgraph::Direction::Incoming),
)
{
if let Some(types) = conn_filter {
if !types.iter().any(|t| *t == edge.weight().connection_type) {
continue;
}
}
let neighbor = if edge.source() == current {
edge.target()
} else {
edge.source()
};
let n_idx = neighbor.index();
if n_idx != target.index() && !node_passes_via_filter(graph, neighbor, &via_set) {
continue;
}
let w = edge_weight(graph, edge.id(), Some(weight_property));
if w < 0.0 {
return None;
}
let next = d + w;
if next < dist[n_idx] {
dist[n_idx] = next;
parent[n_idx] = current_idx as u32;
heap.push(State(next, n_idx));
}
}
}
None
}
pub fn shortest_path_cost_weighted(
graph: &DirGraph,
source: NodeIndex,
target: NodeIndex,
weight_property: &str,
connection_types: Option<&[String]>,
via_types: Option<&[String]>,
deadline: Option<Instant>,
) -> Option<f64> {
shortest_path_weighted(
graph,
source,
target,
weight_property,
connection_types,
via_types,
deadline,
)
.map(|r| r.weight)
}
fn compute_modularity(
graph: &DirGraph,
community: &[usize],
node_exists: &[bool],
total_weight: f64,
weight_property: Option<&str>,
) -> f64 {
if total_weight == 0.0 {
return 0.0;
}
let two_m = 2.0 * total_weight;
let mut q = 0.0f64;
let g = &graph.graph;
let bound = g.node_bound();
let mut degrees: Vec<f64> = vec![0.0; bound];
for node_idx in g.node_indices() {
let i = node_idx.index();
if !node_exists[i] {
continue;
}
for edge in g.edges(node_idx) {
degrees[i] += edge_weight(graph, edge.id(), weight_property);
}
for edge in g.edges_directed(node_idx, petgraph::Direction::Incoming) {
degrees[i] += edge_weight(graph, edge.id(), weight_property);
}
}
for edge in {
let g = &graph.graph;
g.edge_references()
} {
let u = edge.source().index();
let v = edge.target().index();
let w = edge_weight(graph, edge.id(), weight_property);
if community[u] == community[v] {
q += w - degrees[u] * degrees[v] / two_m;
}
}
q / two_m
}
#[cfg(test)]
#[path = "graph_algorithms_tests.rs"]
mod tests;