use crate::parallel::partitioner::Partition;
use crate::node::NodeIndex;
use crate::vgi::VirtualGraph;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct PageRankConfig {
pub damping: f64,
pub max_iterations: usize,
pub tolerance: f64,
pub sparse: bool,
}
struct PartitionPageRankParams<'a> {
partition: &'a Partition,
node_id_to_pos: &'a [usize],
ranks: &'a [f64],
new_ranks: &'a mut [f64],
damping: f64,
teleport: f64,
inv_degrees: &'a [f64],
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PageRankConfigError {
InvalidDamping,
ZeroMaxIterations,
NegativeTolerance,
}
impl std::fmt::Display for PageRankConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PageRankConfigError::InvalidDamping => {
write!(f, "damping must be in [0, 1]")
}
PageRankConfigError::ZeroMaxIterations => {
write!(f, "max_iterations must be greater than 0")
}
PageRankConfigError::NegativeTolerance => {
write!(f, "tolerance must be non-negative")
}
}
}
}
impl std::error::Error for PageRankConfigError {}
impl Default for PageRankConfig {
fn default() -> Self {
Self {
damping: 0.85,
max_iterations: 20,
tolerance: 1e-6,
sparse: false,
}
}
}
impl PageRankConfig {
pub fn new(damping: f64, max_iterations: usize, tolerance: f64) -> Self {
Self {
damping,
max_iterations,
tolerance,
..Default::default()
}
}
pub fn validate(&self) -> Result<(), PageRankConfigError> {
if self.damping < 0.0 || self.damping > 1.0 {
return Err(PageRankConfigError::InvalidDamping);
}
if self.max_iterations == 0 {
return Err(PageRankConfigError::ZeroMaxIterations);
}
if self.tolerance < 0.0 {
return Err(PageRankConfigError::NegativeTolerance);
}
Ok(())
}
pub fn try_new(
damping: f64,
max_iterations: usize,
tolerance: f64,
) -> Result<Self, PageRankConfigError> {
let config = Self::new(damping, max_iterations, tolerance);
config.validate()?;
Ok(config)
}
pub fn with_sparse(mut self, sparse: bool) -> Self {
self.sparse = sparse;
self
}
}
#[derive(Debug, Clone)]
pub struct PageRankResult {
pub node_id_to_pos: Vec<usize>,
pub ranks: Vec<f64>,
pub node_ids: Vec<NodeIndex>,
pub iterations: usize,
pub converged: bool,
pub computation_time_ms: u64,
pub partition_stats: Vec<PartitionPageRankStats>,
}
impl PageRankResult {
#[inline]
fn get_pos(&self, node: NodeIndex) -> Option<usize> {
let pos = *self.node_id_to_pos.get(node.index())?;
if pos == usize::MAX {
None
} else {
Some(pos)
}
}
pub fn rank(&self, node: NodeIndex) -> Result<f64, GraphError> {
let pos = self.get_pos(node).ok_or_else(|| GraphError::NotFound(
format!("Node {:?} not found in PageRank result", node)
))?;
self.ranks.get(pos).copied().ok_or_else(|| GraphError::NotFound(
format!("Node {:?} has no PageRank value", node)
))
}
pub fn top_k(&self, k: usize) -> Vec<(NodeIndex, f64)> {
let mut nodes: Vec<(NodeIndex, f64)> = self
.node_ids
.iter()
.zip(self.ranks.iter())
.map(|(&id, &rank)| (id, rank))
.collect();
if k >= nodes.len() {
nodes.sort_unstable_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
return nodes;
}
nodes.select_nth_unstable_by(k, |a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
nodes.truncate(k);
nodes
}
}
#[derive(Debug, Clone)]
pub struct PartitionPageRankStats {
pub partition_id: usize,
pub node_count: usize,
pub boundary_count: usize,
pub rank_sum: f64,
pub max_rank: f64,
pub min_rank: f64,
}
pub struct DistributedPageRank {
config: PageRankConfig,
}
impl DistributedPageRank {
pub fn new(damping: f64, max_iterations: usize, tolerance: f64) -> Self {
Self {
config: PageRankConfig::new(damping, max_iterations, tolerance),
}
}
pub fn from_config(config: PageRankConfig) -> Self {
Self { config }
}
pub fn compute<G>(&self, graph: &G, partitions: &[Partition]) -> PageRankResult
where
G: VirtualGraph<NodeData = (), EdgeData = ()>,
{
let start_time = Instant::now();
let max_node_id = partitions
.iter()
.flat_map(|p| p.nodes.iter().map(|n| n.index()))
.max()
.unwrap_or(0);
let vec_size = max_node_id + 1;
let mut node_id_to_pos: Vec<usize> = vec![usize::MAX; vec_size];
let total_nodes = partitions.iter().map(|p| p.nodes.len()).sum::<usize>();
let initial_rank = 1.0 / total_nodes.max(1) as f64;
let mut ranks: Vec<f64> = Vec::with_capacity(total_nodes);
let mut new_ranks: Vec<f64> = Vec::with_capacity(total_nodes);
let mut node_ids: Vec<NodeIndex> = Vec::with_capacity(total_nodes);
for partition in partitions {
for &node in &partition.nodes {
let nid = node.index();
if node_id_to_pos[nid] == usize::MAX {
let pos = ranks.len();
node_id_to_pos[nid] = pos;
ranks.push(initial_rank);
new_ranks.push(initial_rank);
node_ids.push(node);
}
}
}
let max_node_id = node_id_to_pos.len();
let mut inv_degrees: Vec<f64> = vec![0.0; max_node_id];
for partition in partitions {
for &node in &partition.nodes {
let nid = node.index();
if let Ok(out_degree) = graph.out_degree(node) {
if out_degree > 0 {
inv_degrees[nid] = 1.0 / out_degree as f64;
}
}
}
}
let damping = self.config.damping;
let teleport = (1.0 - damping) / total_nodes.max(1) as f64;
let mut converged = false;
let mut iterations = 0;
for iter in 0..self.config.max_iterations {
iterations = iter + 1;
let mut max_diff = 0.0;
for partition in partitions {
let params = PartitionPageRankParams {
partition,
node_id_to_pos: &node_id_to_pos,
ranks: &ranks,
new_ranks: &mut new_ranks,
damping,
teleport,
inv_degrees: &inv_degrees,
};
self.compute_partition_pagerank_vec(graph, params);
}
for i in 0..ranks.len() {
let diff = (new_ranks[i] - ranks[i]).abs();
if diff > max_diff {
max_diff = diff;
}
}
std::mem::swap(&mut ranks, &mut new_ranks);
if max_diff < self.config.tolerance {
converged = true;
break;
}
}
let computation_time_ms = start_time.elapsed().as_millis() as u64;
let partition_stats: Vec<PartitionPageRankStats> = partitions
.iter()
.map(|p| self.compute_partition_stats_vec(p, &node_id_to_pos, &ranks))
.collect();
PageRankResult {
node_id_to_pos,
ranks,
node_ids,
iterations,
converged,
computation_time_ms,
partition_stats,
}
}
fn compute_partition_pagerank_vec<G>(
&self,
graph: &G,
params: PartitionPageRankParams<'_>,
) where
G: VirtualGraph<NodeData = (), EdgeData = ()>,
{
let PartitionPageRankParams {
partition,
node_id_to_pos,
ranks,
new_ranks,
damping,
teleport,
inv_degrees,
} = params;
for &node in &partition.nodes {
let mut new_rank = teleport;
for neighbor in graph.neighbors(node) {
let nid = neighbor.index();
if let Some(&pos) = node_id_to_pos.get(nid) {
if pos != usize::MAX {
let inv_out_degree = inv_degrees[nid];
if inv_out_degree > 0.0 {
new_rank += damping * ranks[pos] * inv_out_degree;
}
}
}
}
let nid = node.index();
if let Some(&pos) = node_id_to_pos.get(nid) {
if pos != usize::MAX {
new_ranks[pos] = new_rank;
}
}
}
}
fn compute_partition_stats_vec(
&self,
partition: &Partition,
node_id_to_pos: &[usize],
ranks: &[f64],
) -> PartitionPageRankStats {
let mut rank_sum = 0.0;
let mut max_rank = f64::NEG_INFINITY;
let mut min_rank = f64::INFINITY;
for &node in &partition.nodes {
let nid = node.index();
if let Some(&pos) = node_id_to_pos.get(nid) {
if pos != usize::MAX {
let rank = ranks[pos];
rank_sum += rank;
if rank > max_rank {
max_rank = rank;
}
if rank < min_rank {
min_rank = rank;
}
}
}
}
PartitionPageRankStats {
partition_id: partition.id,
node_count: partition.nodes.len(),
boundary_count: partition.boundary_nodes.len(),
rank_sum,
max_rank,
min_rank,
}
}
}
pub fn simple_pagerank<G>(
graph: &G,
damping: f64,
max_iterations: usize,
tolerance: f64,
) -> Vec<f64>
where
G: VirtualGraph<NodeData = (), EdgeData = ()>,
{
let total_nodes = graph.node_count();
let initial_rank = 1.0 / total_nodes.max(1) as f64;
let teleport = (1.0 - damping) / total_nodes.max(1) as f64;
let mut ranks: Vec<f64> = vec![initial_rank; total_nodes];
let mut new_ranks: Vec<f64> = vec![initial_rank; total_nodes];
let mut inv_degrees: Vec<f64> = vec![0.0; total_nodes];
for node_ref in graph.nodes() {
let node = node_ref.index();
if let Ok(out_degree) = graph.out_degree(node) {
if out_degree > 0 {
inv_degrees[node.index()] = 1.0 / out_degree as f64;
}
}
}
for _ in 0..max_iterations {
let mut max_diff = 0.0;
for node_ref in graph.nodes() {
let node_id = node_ref.index.index();
let mut new_rank = teleport;
for neighbor in graph.neighbors(node_ref.index()) {
let nid = neighbor.index();
if nid < ranks.len() {
let inv_out_degree = inv_degrees[nid];
if inv_out_degree > 0.0 {
new_rank += damping * ranks[nid] * inv_out_degree;
}
}
}
if node_id < new_ranks.len() {
new_ranks[node_id] = new_rank;
}
}
for i in 0..ranks.len().min(new_ranks.len()) {
let diff = (new_ranks[i] - ranks[i]).abs();
if diff > max_diff {
max_diff = diff;
}
}
std::mem::swap(&mut ranks, &mut new_ranks);
if max_diff < tolerance {
break;
}
}
ranks
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parallel::partitioner::{HashPartitioner, Partitioner};
use crate::graph::Graph;
use crate::graph::traits::GraphOps;
#[test]
fn test_pagerank_config() {
let config = PageRankConfig::new(0.85, 50, 1e-8).with_sparse(true);
assert_eq!(config.damping, 0.85);
assert_eq!(config.max_iterations, 50);
assert_eq!(config.tolerance, 1e-8);
assert!(config.sparse);
}
#[test]
fn test_distributed_pagerank_basic() {
let mut graph = Graph::<(), ()>::undirected();
for _ in 0..20 {
graph.add_node(()).unwrap();
}
let partitioner = HashPartitioner::new(4);
let partitions = partitioner.partition_graph(&graph);
let pr = DistributedPageRank::new(0.85, 20, 1e-6);
let result = pr.compute(&graph, &partitions);
assert_eq!(result.ranks.len(), 20);
assert!(result.iterations <= 20);
assert!(result.converged);
}
#[test]
fn test_distributed_pagerank_convergence() {
let mut graph = Graph::<(), ()>::undirected();
let a = graph.add_node(()).unwrap();
let b = graph.add_node(()).unwrap();
graph.add_edge(a, b, ()).unwrap();
graph.add_edge(b, a, ()).unwrap();
let partitioner = HashPartitioner::new(2);
let partitions = partitioner.partition_graph(&graph);
let pr = DistributedPageRank::new(0.85, 100, 1e-10);
let result = pr.compute(&graph, &partitions);
assert!(result.converged);
let rank_a = result.rank(a).unwrap();
let rank_b = result.rank(b).unwrap();
assert!((rank_a - rank_b).abs() < 0.01);
}
#[test]
fn test_pagerank_top_k() {
let mut graph = Graph::<(), ()>::undirected();
for _ in 0..10 {
graph.add_node(()).unwrap();
}
let partitioner = HashPartitioner::new(2);
let partitions = partitioner.partition_graph(&graph);
let pr = DistributedPageRank::new(0.85, 20, 1e-6);
let result = pr.compute(&graph, &partitions);
let top_3 = result.top_k(3);
assert_eq!(top_3.len(), 3);
}
#[test]
fn test_simple_pagerank() {
let mut graph = Graph::<(), ()>::undirected();
let a = graph.add_node(()).unwrap();
let b = graph.add_node(()).unwrap();
graph.add_edge(a, b, ()).unwrap();
graph.add_edge(b, a, ()).unwrap();
let ranks = simple_pagerank(&graph, 0.85, 20, 1e-6);
assert_eq!(ranks.len(), 2);
assert!(ranks.iter().sum::<f64>() > 0.9); }
#[test]
fn test_partition_stats() {
let mut graph = Graph::<(), ()>::undirected();
for _ in 0..100 {
graph.add_node(()).unwrap();
}
let partitioner = HashPartitioner::new(4);
let partitions = partitioner.partition_graph(&graph);
let pr = DistributedPageRank::new(0.85, 20, 1e-6);
let result = pr.compute(&graph, &partitions);
assert_eq!(result.partition_stats.len(), 4);
let total_nodes: usize = result.partition_stats.iter().map(|s| s.node_count).sum();
assert_eq!(total_nodes, 100);
}
}