pub mod preds {
use anyhow::ensure;
use predicates::{Predicate, reflection::PredicateReflection};
use std::fmt::Display;
#[doc(hidden)]
#[derive(Debug)]
pub struct PredParams {
pub iteration: usize,
pub norm_delta: f64,
}
#[derive(Debug, Clone)]
pub struct MaxIter {
max_iter: usize,
}
impl MaxIter {
pub const DEFAULT_MAX_ITER: usize = usize::MAX;
}
impl From<usize> for MaxIter {
fn from(max_iter: usize) -> Self {
MaxIter { max_iter }
}
}
impl Default for MaxIter {
fn default() -> Self {
Self::from(Self::DEFAULT_MAX_ITER)
}
}
impl Display for MaxIter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("(max iter: {})", self.max_iter))
}
}
impl PredicateReflection for MaxIter {}
impl Predicate<PredParams> for MaxIter {
fn eval(&self, pred_params: &PredParams) -> bool {
pred_params.iteration >= self.max_iter
}
}
#[derive(Debug, Clone)]
pub struct L1Norm {
threshold: f64,
}
impl L1Norm {
pub const DEFAULT_THRESHOLD: f64 = 1E-6;
}
impl TryFrom<Option<f64>> for L1Norm {
type Error = anyhow::Error;
fn try_from(threshold: Option<f64>) -> anyhow::Result<Self> {
Ok(match threshold {
Some(threshold) => {
ensure!(!threshold.is_nan());
ensure!(threshold > 0.0, "The threshold must be positive");
L1Norm { threshold }
}
None => Self::default(),
})
}
}
impl TryFrom<f64> for L1Norm {
type Error = anyhow::Error;
fn try_from(threshold: f64) -> anyhow::Result<Self> {
Some(threshold).try_into()
}
}
impl Default for L1Norm {
fn default() -> Self {
Self::try_from(Self::DEFAULT_THRESHOLD).unwrap()
}
}
impl Display for L1Norm {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("(norm: {})", self.threshold))
}
}
impl PredicateReflection for L1Norm {}
impl Predicate<PredParams> for L1Norm {
fn eval(&self, pred_params: &PredParams) -> bool {
pred_params.norm_delta <= self.threshold
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Mode {
#[default]
StronglyPreferential,
WeaklyPreferential,
PseudoRank,
}
impl std::fmt::Display for Mode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Mode::StronglyPreferential => f.write_str("strongly preferential"),
Mode::WeaklyPreferential => f.write_str("weakly preferential"),
Mode::PseudoRank => f.write_str("pseudorank"),
}
}
}
use dsi_progress_logger::{ConcurrentProgressLog, ProgressLog, no_logging};
use kahan::KahanSum;
use lender::prelude::*;
use predicates::Predicate;
use rayon::iter::{
IndexedParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator,
};
use std::sync::{
Mutex,
atomic::{AtomicUsize, Ordering},
};
use sync_cell_slice::SyncSlice;
use webgraph::traits::RandomAccessGraph;
use webgraph::utils::Granularity;
pub struct PageRank<'a, G: RandomAccessGraph + Sync> {
transpose: &'a G,
alpha: f64,
inv_outdegrees: Option<Box<[f64]>>,
preference: Option<&'a [f64]>,
mode: Mode,
granularity: Granularity,
norm_delta: f64,
rank: Box<[f64]>,
iteration: usize,
}
impl<G: RandomAccessGraph + Sync> std::fmt::Debug for PageRank<'_, G> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PageRank")
.field("alpha", &self.alpha)
.field("mode", &self.mode)
.field("granularity", &self.granularity)
.field("norm_delta", &self.norm_delta)
.field("iteration", &self.iteration)
.finish_non_exhaustive()
}
}
impl<'a, G: RandomAccessGraph + Sync> PageRank<'a, G> {
pub fn new(transpose: &'a G) -> Self {
let n = transpose.num_nodes();
let rank = vec![0.0; n].into_boxed_slice();
Self {
transpose,
alpha: 0.85,
inv_outdegrees: None,
preference: None,
mode: Mode::default(),
granularity: Granularity::default(),
norm_delta: f64::INFINITY,
rank,
iteration: 0,
}
}
pub fn alpha(&mut self, alpha: f64) -> &mut Self {
assert!(
(0.0..1.0).contains(&alpha),
"The damping factor must be in [0 . . 1), got {alpha}"
);
self.alpha = alpha;
self
}
pub fn preference(&mut self, preference: Option<&'a [f64]>) -> &mut Self {
if let Some(v) = preference {
let n = self.transpose.num_nodes();
assert_eq!(
v.len(),
n,
"Preference vector length ({}) does not match the number of nodes ({n})",
v.len()
);
#[cfg(test)]
Self::assert_stochastic(v, "preference");
}
self.preference = preference;
self
}
pub fn mode(&mut self, mode: Mode) -> &mut Self {
self.mode = mode;
self
}
pub fn granularity(&mut self, granularity: Granularity) -> &mut Self {
self.granularity = granularity;
self
}
pub fn rank(&self) -> &[f64] {
&self.rank
}
pub fn iterations(&self) -> usize {
self.iteration
}
pub fn norm_delta(&self) -> f64 {
self.norm_delta
}
pub fn run(&mut self, predicate: impl Predicate<preds::PredParams>) {
self.run_with_logging(predicate, no_logging![], no_logging![]);
}
pub fn run_with_logging(
&mut self,
predicate: impl Predicate<preds::PredParams>,
pl: &mut impl ProgressLog,
cpl: &mut impl ConcurrentProgressLog,
) {
let n = self.transpose.num_nodes();
if n == 0 {
return;
}
log::info!("Mode: {}", self.mode);
log::info!("Alpha: {}", self.alpha);
log::info!(
"Preference: {}",
if self.preference.is_some() {
"custom"
} else {
"uniform"
}
);
log::info!("Stopping criterion: {}", predicate);
self.iteration = 0;
let inv_n = 1.0 / n as f64;
match self.preference {
Some(v) => self.rank.copy_from_slice(v),
None => self.rank.fill(inv_n),
}
let inv_outdegrees = self.inv_outdegrees.get_or_insert_with(|| {
let mut counts = vec![0.0; n].into_boxed_slice();
pl.item_name("node");
pl.expected_updates(Some(n));
pl.start("Computing outdegrees...");
for_![(_, succ) in self.transpose.iter() {
for j in succ {
counts[j] += 1.0;
}
pl.light_update();
}];
pl.done();
pl.info(format_args!("Inverting outdegrees..."));
counts
.par_iter_mut()
.with_min_len(sux::RAYON_MIN_LEN)
.for_each(|c| {
if *c != 0.0 {
*c = 1.0 / *c;
}
});
counts
});
pl.info(format_args!("Computing initial dangling rank..."));
let (dangling_count, dangling_rank) = inv_outdegrees
.par_iter()
.with_min_len(sux::RAYON_MIN_LEN)
.enumerate()
.filter(|&(_, &inv_d)| inv_d == 0.0)
.fold(
|| (0usize, KahanSum::<f64>::new()),
|(count, dangling_rank), (i, _)| (count + 1, dangling_rank + self.rank[i]),
)
.reduce(
|| (0usize, KahanSum::<f64>::new()),
|(count0, rank0), (count1, rank1)| (count0 + count1, rank0 + rank1),
);
let mut dangling_rank = dangling_rank.sum();
log::info!("{} dangling nodes", dangling_count);
log::info!("Initial dangling rank: {}", dangling_rank);
let node_granularity = self
.granularity
.node_granularity(n, Some(self.transpose.num_arcs()))
.max(1);
pl.item_name("iteration");
pl.expected_updates(None);
pl.start(format!(
"Computing PageRank (alpha={}, granularity={node_granularity})...",
self.alpha
));
loop {
let norm_delta_accum = Mutex::new(0.0f64);
let dangling_rank_accum = Mutex::new(0.0f64);
let node_cursor = AtomicUsize::new(0);
let rank_sync = self.rank.as_sync_slice();
cpl.item_name("node");
cpl.expected_updates(Some(n));
cpl.start(format!("Iteration {}...", self.iteration + 1));
rayon::broadcast(|_| {
let mut local_cpl = cpl.clone();
let mut local_norm: KahanSum<f64> = KahanSum::new();
let mut local_dangling: KahanSum<f64> = KahanSum::new();
loop {
let start = node_cursor.fetch_add(node_granularity, Ordering::Relaxed);
if start >= n {
break;
}
let len = node_granularity.min(n - start);
for_![(i, succ) in self.transpose.iter_from(start).take(len) {
unsafe {
let mut sigma: KahanSum<f64> = KahanSum::new();
let mut has_loop = false;
for j in succ {
if j == i {
has_loop = true;
} else {
sigma += rank_sync[j].get() * inv_outdegrees[j];
}
}
let v_i = match self.preference {
Some(v) => v[i],
None => inv_n,
};
let u_i = match self.mode {
Mode::StronglyPreferential => v_i,
Mode::WeaklyPreferential => inv_n,
Mode::PseudoRank => 0.0, };
let (self_dangling_rank, self_loop_factor) = if inv_outdegrees[i] == 0.0
{
let sdr = rank_sync[i].get();
let slf = if self.mode == Mode::PseudoRank {
1.0
} else {
1.0 - self.alpha * u_i
};
(sdr, slf)
} else {
let slf = if has_loop {
1.0 - self.alpha * inv_outdegrees[i]
} else {
1.0
};
(0.0, slf)
};
if self.mode != Mode::PseudoRank {
sigma += (dangling_rank - self_dangling_rank) * u_i;
}
let new_rank = ((1.0 - self.alpha) * v_i + self.alpha * sigma.sum())
/ self_loop_factor;
if inv_outdegrees[i] == 0.0 {
local_dangling += new_rank;
}
local_norm += (new_rank - rank_sync[i].get()).abs();
rank_sync[i].set(new_rank);
}
}];
local_cpl.update_with_count(len);
}
*norm_delta_accum.lock().unwrap() += local_norm.sum();
*dangling_rank_accum.lock().unwrap() += local_dangling.sum();
});
cpl.done();
dangling_rank = *dangling_rank_accum.lock().unwrap();
self.norm_delta = *norm_delta_accum.lock().unwrap() * self.alpha / (1.0 - self.alpha);
self.iteration += 1;
log::info!(
"Iteration {}: norm delta = {}",
self.iteration,
self.norm_delta
);
pl.update_and_display();
if predicate.eval(&preds::PredParams {
iteration: self.iteration,
norm_delta: self.norm_delta,
}) {
break;
}
}
pl.done();
}
#[cfg(test)]
fn assert_stochastic(v: &[f64], name: &str) {
for (i, &x) in v.iter().enumerate() {
assert!(
x >= 0.0,
"The {name} vector has a negative entry at index {i}: {x}"
);
}
let sum: f64 = v.iter().sum();
assert!(
(sum - 1.0).abs() < 1E-6,
"The {name} vector is not stochastic (sum = {sum})"
);
}
}