use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::cmp::Reverse;
use crate::graph::Graph;
use crate::types::{ulid_encode, DbError, Value};
use super::{opt_bool, opt_direction, opt_f64, opt_usize, opt_weight_prop, Direction,
GraphSnapshot, Row};
pub fn run_pagerank(graph: &Graph, params: &HashMap<String, Value>) -> Result<Vec<Row>, DbError> {
let damping = opt_f64(params, "damping", 0.85)?;
if !(0.0 < damping && damping < 1.0) {
return Err(DbError::Query(
"parameter 'damping' must be strictly between 0 and 1".into(),
));
}
let iterations = opt_usize(params, "iterations", 20)?;
if iterations == 0 {
return Err(DbError::Query("parameter 'iterations' must be at least 1".into()));
}
let tolerance = opt_f64(params, "tolerance", 1e-6)?;
if tolerance < 0.0 {
return Err(DbError::Query("parameter 'tolerance' must be non-negative".into()));
}
let normalize = opt_bool(params, "normalize", true)?;
let weight_prop = opt_weight_prop(params)?;
let snap = GraphSnapshot::build(graph, weight_prop);
let n = snap.n;
if n == 0 {
return Ok(vec![]);
}
let out_weight_sum: Vec<f64> = (0..n)
.map(|i| snap.adj_out[i].iter().map(|&(_, w)| w.max(0.0)).sum())
.collect();
let mut rank = vec![1.0f64 / n as f64; n];
let teleport = (1.0 - damping) / n as f64;
for _ in 0..iterations {
let mut new_rank = vec![0.0f64; n];
let dangling_sum: f64 = rank
.iter()
.enumerate()
.filter(|&(i, _)| snap.adj_out[i].is_empty())
.map(|(_, &r)| r)
.sum::<f64>()
/ n as f64;
for v in 0..n {
new_rank[v] += teleport + damping * dangling_sum;
}
for u in 0..n {
if snap.adj_out[u].is_empty() {
continue; }
let denom = out_weight_sum[u];
for &(v, w) in &snap.adj_out[u] {
let contrib = rank[u] * damping * (w.max(0.0) / denom);
new_rank[v] += contrib;
}
}
let delta: f64 = rank
.iter()
.zip(new_rank.iter())
.map(|(a, b)| (a - b).abs())
.fold(0.0f64, f64::max);
rank = new_rank;
if delta < tolerance {
break; }
}
if normalize {
let total: f64 = rank.iter().sum();
if total > 0.0 {
for r in &mut rank {
*r /= total;
}
}
}
Ok(rank
.into_iter()
.enumerate()
.map(|(i, score)| {
let mut row = HashMap::new();
row.insert(
"node".to_string(),
Value::String(ulid_encode(snap.node_ids[i].0)),
);
row.insert("score".to_string(), Value::Float(score));
row
})
.collect())
}
pub fn run_betweenness(graph: &Graph, params: &HashMap<String, Value>) -> Result<Vec<Row>, DbError> {
let normalized = opt_bool(params, "normalized", true)?;
let directed = opt_bool(params, "directed", true)?;
let sample_size = opt_usize(params, "sampleSize", 0)?;
let weight_prop = opt_weight_prop(params)?;
let snap = GraphSnapshot::build(graph, weight_prop);
let n = snap.n;
if n == 0 {
return Ok(vec![]);
}
let dir = if directed { Direction::Out } else { Direction::Any };
let mut betweenness = vec![0.0f64; n];
let sources: Vec<usize> = if sample_size == 0 || sample_size >= n {
(0..n).collect()
} else {
let step = n / sample_size;
(0..sample_size).map(|k| k * step).collect()
};
for &s in &sources {
let (dist, sigma, pred) = if weight_prop.is_none() {
brandes_bfs(&snap, s, dir)
} else {
brandes_dijkstra(&snap, s, dir)
};
let mut delta = vec![0.0f64; n];
let mut stack_order: Vec<usize> = (0..n).filter(|&v| dist[v].is_finite()).collect();
stack_order.sort_unstable_by(|&a, &b| dist[b].partial_cmp(&dist[a]).unwrap());
for &w in &stack_order {
for &v in &pred[w] {
if sigma[w] > 0.0 {
delta[v] += (sigma[v] / sigma[w]) * (1.0 + delta[w]);
}
}
if w != s {
betweenness[w] += delta[w];
}
}
}
if sample_size > 0 && sample_size < n {
let scale = n as f64 / sample_size as f64;
for b in &mut betweenness {
*b *= scale;
}
}
if normalized && n > 2 {
let norm = if directed {
1.0 / ((n as f64 - 1.0) * (n as f64 - 2.0))
} else {
2.0 / ((n as f64 - 1.0) * (n as f64 - 2.0))
};
for b in &mut betweenness {
*b *= norm;
}
}
Ok(betweenness
.into_iter()
.enumerate()
.map(|(i, score)| {
let mut row = HashMap::new();
row.insert(
"node".to_string(),
Value::String(ulid_encode(snap.node_ids[i].0)),
);
row.insert("score".to_string(), Value::Float(score));
row
})
.collect())
}
fn brandes_bfs(
snap: &GraphSnapshot,
s: usize,
dir: Direction,
) -> (Vec<f64>, Vec<f64>, Vec<Vec<usize>>) {
let n = snap.n;
let mut dist = vec![f64::INFINITY; n];
let mut sigma = vec![0.0f64; n];
let mut pred: Vec<Vec<usize>> = vec![vec![]; n];
dist[s] = 0.0;
sigma[s] = 1.0;
let mut queue = VecDeque::new();
queue.push_back(s);
while let Some(v) = queue.pop_front() {
for (w, _) in snap.neighbors(v, dir) {
if dist[w].is_infinite() {
dist[w] = dist[v] + 1.0;
queue.push_back(w);
}
if (dist[w] - dist[v] - 1.0).abs() < 1e-9 {
sigma[w] += sigma[v];
pred[w].push(v);
}
}
}
(dist, sigma, pred)
}
fn brandes_dijkstra(
snap: &GraphSnapshot,
s: usize,
dir: Direction,
) -> (Vec<f64>, Vec<f64>, Vec<Vec<usize>>) {
let n = snap.n;
let mut dist = vec![f64::INFINITY; n];
let mut sigma = vec![0.0f64; n];
let mut pred: Vec<Vec<usize>> = vec![vec![]; n];
dist[s] = 0.0;
sigma[s] = 1.0;
let mut heap: BinaryHeap<Reverse<(u64, usize)>> = BinaryHeap::new();
heap.push(Reverse((0u64, s)));
while let Some(Reverse((d_bits, v))) = heap.pop() {
let d = f64::from_bits(d_bits);
if d > dist[v] {
continue; }
for (w, weight) in snap.neighbors(v, dir) {
let w_val = weight.max(0.0); let new_dist = dist[v] + w_val;
if new_dist < dist[w] - 1e-12 {
dist[w] = new_dist;
sigma[w] = sigma[v];
pred[w] = vec![v];
heap.push(Reverse((new_dist.to_bits(), w)));
} else if (new_dist - dist[w]).abs() < 1e-12 {
sigma[w] += sigma[v];
pred[w].push(v);
}
}
}
(dist, sigma, pred)
}
pub fn run_closeness(graph: &Graph, params: &HashMap<String, Value>) -> Result<Vec<Row>, DbError> {
let normalized = opt_bool(params, "normalized", true)?;
let direction = opt_direction(params, "direction", Direction::Out)?;
let wf_improved = opt_bool(params, "wfImproved", true)?;
let snap = GraphSnapshot::build(graph, None);
let n = snap.n;
if n == 0 {
return Ok(vec![]);
}
let mut rows = Vec::with_capacity(n);
for s in 0..n {
let mut dist = vec![usize::MAX; n];
dist[s] = 0;
let mut queue = VecDeque::new();
queue.push_back(s);
let mut reachable = 0usize;
let mut sum_dist = 0usize;
while let Some(v) = queue.pop_front() {
for (w, _) in snap.neighbors(v, direction) {
if dist[w] == usize::MAX {
dist[w] = dist[v] + 1;
reachable += 1;
sum_dist += dist[w];
queue.push_back(w);
}
}
}
let score = if sum_dist == 0 || reachable == 0 {
0.0
} else {
let raw = reachable as f64 / sum_dist as f64;
if wf_improved {
let wf = (reachable as f64 / (n as f64 - 1.0)).powi(2);
if normalized { raw * wf } else { raw * wf * (n as f64 - 1.0) }
} else if normalized {
raw
} else {
raw * (n as f64 - 1.0)
}
};
let mut row = HashMap::new();
row.insert(
"node".to_string(),
Value::String(ulid_encode(snap.node_ids[s].0)),
);
row.insert("score".to_string(), Value::Float(score));
rows.push(row);
}
Ok(rows)
}
pub fn run_degree(graph: &Graph, params: &HashMap<String, Value>) -> Result<Vec<Row>, DbError> {
let normalized = opt_bool(params, "normalized", true)?;
let direction_str = match params.get("direction") {
None => "total",
Some(Value::String(s)) => s.as_str(),
Some(other) => {
return Err(DbError::Query(format!(
"parameter 'direction' must be a string, got {other:?}"
)))
}
};
if !matches!(direction_str, "in" | "out" | "total") {
return Err(DbError::Query(format!(
"parameter 'direction' must be \"in\", \"out\", or \"total\", got \"{direction_str}\""
)));
}
let snap = GraphSnapshot::build(graph, None);
let n = snap.n;
if n == 0 {
return Ok(vec![]);
}
let norm_denom = if n > 1 { (n - 1) as f64 } else { 1.0 };
Ok((0..n)
.map(|i| {
let in_deg = snap.adj_in[i].len();
let out_deg = snap.adj_out[i].len();
let total_deg = in_deg + out_deg;
let raw_score = match direction_str {
"in" => in_deg,
"out" => out_deg,
_ => total_deg,
} as f64;
let score = if normalized { raw_score / norm_denom } else { raw_score };
let mut row = HashMap::new();
row.insert(
"node".to_string(),
Value::String(ulid_encode(snap.node_ids[i].0)),
);
row.insert("in_degree".to_string(), Value::Int(in_deg as i64));
row.insert("out_degree".to_string(), Value::Int(out_deg as i64));
row.insert("degree".to_string(), Value::Int(total_deg as i64));
row.insert("score".to_string(), Value::Float(score));
row
})
.collect())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::Graph;
use crate::types::{Edge, Node, NodeId, Value};
fn make_node(g: &mut Graph) -> NodeId {
let id = g.alloc_node_id();
g.apply_insert_node(Node {
id,
labels: vec!["N".into()],
properties: Default::default(),
});
id
}
fn make_edge(g: &mut Graph, from: NodeId, to: NodeId) {
let id = g.alloc_edge_id();
g.apply_insert_edge(Edge {
id,
from_node: from,
to_node: to,
label: "E".into(),
properties: Default::default(),
directed: true,
});
}
fn star_graph() -> (Graph, NodeId, Vec<NodeId>) {
let mut g = Graph::new();
let a = make_node(&mut g);
let spokes: Vec<NodeId> = (0..3).map(|_| {
let n = make_node(&mut g);
make_edge(&mut g, a, n);
n
}).collect();
(g, a, spokes)
}
#[test]
fn pagerank_hub_has_highest_score() {
let mut g = Graph::new();
let a = make_node(&mut g);
let b = make_node(&mut g);
let c = make_node(&mut g);
make_edge(&mut g, b, a);
make_edge(&mut g, c, a);
let params = HashMap::new();
let rows = run_pagerank(&g, ¶ms).unwrap();
let a_score = rows.iter()
.find(|r| r["node"] == Value::String(crate::types::ulid_encode(a.0)))
.map(|r| if let Value::Float(f) = r["score"] { f } else { 0.0 })
.unwrap();
let b_score = rows.iter()
.find(|r| r["node"] == Value::String(crate::types::ulid_encode(b.0)))
.map(|r| if let Value::Float(f) = r["score"] { f } else { 0.0 })
.unwrap();
assert!(a_score > b_score);
}
#[test]
fn pagerank_scores_sum_to_one() {
let (g, _, _) = star_graph();
let params = HashMap::new();
let rows = run_pagerank(&g, ¶ms).unwrap();
let sum: f64 = rows.iter()
.map(|r| if let Value::Float(f) = r["score"] { f } else { 0.0 })
.sum();
assert!((sum - 1.0).abs() < 1e-6, "scores sum to {sum}");
}
#[test]
fn pagerank_invalid_damping_errors() {
let g = Graph::new();
let params: HashMap<String, Value> =
[("damping".to_string(), Value::Float(1.5))].into_iter().collect();
assert!(run_pagerank(&g, ¶ms).is_err());
}
#[test]
fn pagerank_zero_iterations_errors() {
let g = Graph::new();
let params: HashMap<String, Value> =
[("iterations".to_string(), Value::Int(0))].into_iter().collect();
assert!(run_pagerank(&g, ¶ms).is_err());
}
#[test]
fn betweenness_bridge_node_highest() {
let mut g = Graph::new();
let a = make_node(&mut g);
let b = make_node(&mut g);
let c = make_node(&mut g);
make_edge(&mut g, a, b);
make_edge(&mut g, b, c);
let params: HashMap<String, Value> =
[("directed".to_string(), Value::Bool(false))].into_iter().collect();
let rows = run_betweenness(&g, ¶ms).unwrap();
let score_of = |id: NodeId| -> f64 {
rows.iter()
.find(|r| r["node"] == Value::String(crate::types::ulid_encode(id.0)))
.map(|r| if let Value::Float(f) = r["score"] { f } else { 0.0 })
.unwrap_or(0.0)
};
assert!(score_of(b) > score_of(a));
assert!(score_of(b) > score_of(c));
}
#[test]
fn betweenness_empty_graph() {
let g = Graph::new();
let rows = run_betweenness(&g, &HashMap::new()).unwrap();
assert!(rows.is_empty());
}
#[test]
fn closeness_central_node_highest() {
let mut g = Graph::new();
let a = make_node(&mut g);
let b = make_node(&mut g);
let c = make_node(&mut g);
make_edge(&mut g, a, b);
make_edge(&mut g, b, c);
let params: HashMap<String, Value> =
[("direction".to_string(), Value::String("any".into()))].into_iter().collect();
let rows = run_closeness(&g, ¶ms).unwrap();
let b_score = rows.iter()
.find(|r| r["node"] == Value::String(crate::types::ulid_encode(b.0)))
.map(|r| if let Value::Float(f) = r["score"] { f } else { 0.0 })
.unwrap_or(0.0);
let a_score = rows.iter()
.find(|r| r["node"] == Value::String(crate::types::ulid_encode(a.0)))
.map(|r| if let Value::Float(f) = r["score"] { f } else { 0.0 })
.unwrap_or(0.0);
assert!(b_score >= a_score);
}
#[test]
fn degree_counts_correct() {
let (g, hub, spokes) = star_graph();
let params = HashMap::new();
let rows = run_degree(&g, ¶ms).unwrap();
let hub_row = rows.iter()
.find(|r| r["node"] == Value::String(crate::types::ulid_encode(hub.0)))
.unwrap();
assert_eq!(hub_row["out_degree"], Value::Int(3));
assert_eq!(hub_row["in_degree"], Value::Int(0));
}
#[test]
fn degree_invalid_direction_errors() {
let g = Graph::new();
let params: HashMap<String, Value> = [(
"direction".to_string(),
Value::String("diagonal".into()),
)]
.into_iter()
.collect();
assert!(run_degree(&g, ¶ms).is_err());
}
}