Skip to main content

oxicuda_graphalg/dynamic/
incremental_pagerank.rs

1//! Incremental PageRank over a streaming [`DynamicGraph`].
2//!
3//! A single edge insertion or deletion perturbs only a handful of rows of the
4//! Google matrix `G = d · (A D^{-1} + dangling teleport) + (1-d)/n · 1 1^T`, so
5//! the previous stationary distribution is an excellent warm start for the new
6//! one. [`IncrementalPageRank`] keeps the current rank vector and, after each
7//! batch of `apply_*` mutations, resumes the power iteration **from that vector**
8//! rather than from the uniform cold start. The fixed point of the power
9//! iteration is unique (the Google matrix is column-stochastic, irreducible and
10//! aperiodic for `0 < d < 1`), so warm-starting converges to *exactly* the same
11//! ranks an independent cold-start solve would — only faster.
12//!
13//! Dangling vertices (out-degree zero) redistribute their mass uniformly, and
14//! the result is renormalised to sum to one, matching
15//! [`crate::centrality::pagerank::pagerank`].
16
17use crate::dynamic::dynamic_graph::DynamicGraph;
18use crate::error::{GraphalgError, GraphalgResult};
19
20/// Stateful incremental PageRank engine bound to a mutable graph.
21#[derive(Debug, Clone)]
22pub struct IncrementalPageRank {
23    graph: DynamicGraph,
24    damping: f64,
25    max_iter: usize,
26    tol: f64,
27    /// Current rank vector (always normalised to sum one).
28    rank: Vec<f64>,
29    /// Iterations consumed by the most recent [`recompute`](Self::recompute).
30    last_iterations: usize,
31}
32
33impl IncrementalPageRank {
34    /// Create an engine over `graph` and converge an initial cold-start solve.
35    ///
36    /// * `damping` — the teleport-complement `d ∈ [0, 1]` (typically `0.85`).
37    /// * `max_iter` — power-iteration cap per (re)solve.
38    /// * `tol` — L1 convergence tolerance between successive iterates.
39    pub fn new(
40        graph: DynamicGraph,
41        damping: f64,
42        max_iter: usize,
43        tol: f64,
44    ) -> GraphalgResult<Self> {
45        if !(0.0..=1.0).contains(&damping) {
46            return Err(GraphalgError::InvalidParameter(format!(
47                "damping out of [0,1]: {damping}"
48            )));
49        }
50        if !tol.is_finite() || tol < 0.0 {
51            return Err(GraphalgError::InvalidParameter(format!(
52                "tolerance must be finite and non-negative: {tol}"
53            )));
54        }
55        let n = graph.num_nodes();
56        let rank = if n == 0 {
57            Vec::new()
58        } else {
59            vec![1.0 / n as f64; n]
60        };
61        let mut engine = Self {
62            graph,
63            damping,
64            max_iter,
65            tol,
66            rank,
67            last_iterations: 0,
68        };
69        // Cold start: iterate from the uniform vector to the fixed point.
70        engine.power_iterate();
71        Ok(engine)
72    }
73
74    /// Current (normalised) rank vector.
75    pub fn ranks(&self) -> &[f64] {
76        &self.rank
77    }
78
79    /// PageRank of a single vertex.
80    pub fn rank_of(&self, v: usize) -> GraphalgResult<f64> {
81        if v >= self.graph.num_nodes() {
82            return Err(GraphalgError::IndexOutOfBounds {
83                index: v,
84                len: self.graph.num_nodes(),
85            });
86        }
87        Ok(self.rank[v])
88    }
89
90    /// Iterations consumed by the most recent (re)solve — strictly informational,
91    /// useful for demonstrating the warm-start speed-up in tests/benchmarks.
92    pub fn last_iterations(&self) -> usize {
93        self.last_iterations
94    }
95
96    /// Borrow the underlying graph.
97    pub fn graph(&self) -> &DynamicGraph {
98        &self.graph
99    }
100
101    /// Insert a directed edge and re-converge from the warm start.
102    pub fn apply_insert(&mut self, u: usize, v: usize) -> GraphalgResult<()> {
103        self.graph.add_edge(u, v)?;
104        self.recompute();
105        Ok(())
106    }
107
108    /// Remove a directed edge and re-converge from the warm start.
109    pub fn apply_remove(&mut self, u: usize, v: usize) -> GraphalgResult<()> {
110        self.graph.remove_edge(u, v)?;
111        self.recompute();
112        Ok(())
113    }
114
115    /// Apply a batch of `(u, v, is_insert)` mutations atomically, re-converging
116    /// once at the end (cheaper than re-solving per edge).
117    pub fn apply_batch(&mut self, updates: &[(usize, usize, bool)]) -> GraphalgResult<()> {
118        for &(u, v, is_insert) in updates {
119            if is_insert {
120                self.graph.add_edge(u, v)?;
121            } else {
122                self.graph.remove_edge(u, v)?;
123            }
124        }
125        self.recompute();
126        Ok(())
127    }
128
129    /// Resume the power iteration from the current rank vector. Public so callers
130    /// can force a re-solve after mutating the graph through other means.
131    pub fn recompute(&mut self) {
132        let n = self.graph.num_nodes();
133        if self.rank.len() != n {
134            // Vertex count cannot change here, but stay defensive.
135            self.rank = if n == 0 {
136                Vec::new()
137            } else {
138                vec![1.0 / n as f64; n]
139            };
140        }
141        self.power_iterate();
142    }
143
144    /// Core warm-startable power iteration writing into `self.rank`.
145    fn power_iterate(&mut self) {
146        let n = self.graph.num_nodes();
147        self.last_iterations = 0;
148        if n == 0 {
149            return;
150        }
151        let n_f = n as f64;
152        // Out-degree (counting multiplicity) per vertex, precomputed once.
153        let mut out_deg = vec![0.0f64; n];
154        for u in 0..n {
155            out_deg[u] = self.graph.out_degree(u).unwrap_or(0) as f64;
156        }
157        let d = self.damping;
158        let mut cur = std::mem::take(&mut self.rank);
159        // Renormalise the warm start defensively (mutations never touch `rank`,
160        // but a caller-supplied state might be slightly off).
161        normalize(&mut cur, n_f);
162        let mut next = vec![0.0f64; n];
163        for _ in 0..self.max_iter {
164            let mut dangling = 0.0f64;
165            for v in 0..n {
166                if out_deg[v] == 0.0 {
167                    dangling += cur[v];
168                }
169            }
170            let teleport = (1.0 - d) / n_f + d * dangling / n_f;
171            for slot in next.iter_mut() {
172                *slot = teleport;
173            }
174            for u in 0..n {
175                let od = out_deg[u];
176                if od == 0.0 {
177                    continue;
178                }
179                // Multiplicity-aware: each parallel copy of (u,v) carries mass.
180                let base = d * cur[u] / od;
181                if let Ok(neigh) = self.graph.out_neighbors(u) {
182                    for &v in neigh {
183                        let m = self.graph.edge_multiplicity(u, v) as f64;
184                        next[v] += base * m;
185                    }
186                }
187            }
188            let diff: f64 = cur
189                .iter()
190                .zip(next.iter())
191                .map(|(a, b)| (a - b).abs())
192                .sum();
193            std::mem::swap(&mut cur, &mut next);
194            self.last_iterations += 1;
195            if diff < self.tol {
196                break;
197            }
198        }
199        normalize(&mut cur, n_f);
200        self.rank = cur;
201    }
202}
203
204/// Normalise `v` to sum one; fall back to uniform if the mass underflows.
205fn normalize(v: &mut [f64], n_f: f64) {
206    let s: f64 = v.iter().sum();
207    if s.abs() > 1e-300 {
208        for x in v.iter_mut() {
209            *x /= s;
210        }
211    } else if n_f > 0.0 {
212        let u = 1.0 / n_f;
213        for x in v.iter_mut() {
214            *x = u;
215        }
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use crate::centrality::pagerank::pagerank;
223    use crate::handle::LcgRng;
224
225    fn approx_vec(a: &[f64], b: &[f64], tol: f64) -> bool {
226        a.len() == b.len() && a.iter().zip(b).all(|(x, y)| (x - y).abs() < tol)
227    }
228
229    #[test]
230    fn matches_static_pagerank_initial() {
231        let mut g = DynamicGraph::new(4);
232        g.add_edge(0, 1).expect("e");
233        g.add_edge(1, 2).expect("e");
234        g.add_edge(2, 0).expect("e");
235        g.add_edge(2, 3).expect("e");
236        let engine = IncrementalPageRank::new(g.clone(), 0.85, 500, 1e-12).expect("new");
237        let stat = pagerank(&g.to_adjacency_list(), 0.85, 500, 1e-12).expect("static");
238        assert!(
239            approx_vec(engine.ranks(), &stat, 1e-9),
240            "incremental {:?} != static {:?}",
241            engine.ranks(),
242            stat
243        );
244        let s: f64 = engine.ranks().iter().sum();
245        assert!((s - 1.0).abs() < 1e-9);
246    }
247
248    #[test]
249    fn insert_then_match_static_recompute() {
250        let mut g = DynamicGraph::new(5);
251        g.add_edge(0, 1).expect("e");
252        g.add_edge(1, 2).expect("e");
253        g.add_edge(2, 3).expect("e");
254        g.add_edge(3, 4).expect("e");
255        let mut engine = IncrementalPageRank::new(g, 0.85, 1000, 1e-13).expect("new");
256        engine.apply_insert(4, 0).expect("insert"); // close the cycle
257        engine.apply_insert(0, 2).expect("insert");
258
259        // Independent cold-start solve on the mutated graph.
260        let stat =
261            pagerank(&engine.graph().to_adjacency_list(), 0.85, 1000, 1e-13).expect("static");
262        assert!(
263            approx_vec(engine.ranks(), &stat, 1e-8),
264            "after inserts incremental {:?} != static {:?}",
265            engine.ranks(),
266            stat
267        );
268    }
269
270    #[test]
271    fn remove_then_match_static_recompute() {
272        let mut g = DynamicGraph::new(4);
273        for &(u, v) in &[(0, 1), (1, 2), (2, 0), (2, 3), (3, 1)] {
274            g.add_edge(u, v).expect("e");
275        }
276        let mut engine = IncrementalPageRank::new(g, 0.85, 1000, 1e-13).expect("new");
277        engine.apply_remove(3, 1).expect("remove");
278        let stat =
279            pagerank(&engine.graph().to_adjacency_list(), 0.85, 1000, 1e-13).expect("static");
280        assert!(approx_vec(engine.ranks(), &stat, 1e-8));
281    }
282
283    #[test]
284    fn warm_start_is_cheaper_than_cold() {
285        // Build a sizeable random graph, converge, then perturb a single edge.
286        let mut rng = LcgRng::new(0xDEAD_BEEF);
287        let n = 60;
288        let mut g = DynamicGraph::new(n);
289        for u in 0..n {
290            for _ in 0..3 {
291                let v = rng.next_usize(n);
292                if v != u {
293                    g.add_edge(u, v).expect("e");
294                }
295            }
296        }
297        let mut engine = IncrementalPageRank::new(g, 0.85, 2000, 1e-12).expect("new");
298        let cold_iters = engine.last_iterations();
299        // One edge change: warm restart should need strictly fewer iterations.
300        engine.apply_insert(0, n - 1).expect("insert");
301        let warm_iters = engine.last_iterations();
302        assert!(
303            warm_iters <= cold_iters,
304            "warm {warm_iters} should not exceed cold {cold_iters}"
305        );
306        assert!(warm_iters >= 1);
307    }
308
309    #[test]
310    fn batch_matches_sequential_and_static() {
311        let mut rng = LcgRng::new(123_456);
312        let n = 30;
313        let mut g = DynamicGraph::new(n);
314        for u in 0..n {
315            let v = rng.next_usize(n);
316            if v != u {
317                g.add_edge(u, v).expect("e");
318            }
319        }
320        let mut engine = IncrementalPageRank::new(g, 0.85, 2000, 1e-13).expect("new");
321
322        // A batch of legal inserts (avoid removing absent edges).
323        let updates: Vec<(usize, usize, bool)> = (0..8)
324            .map(|_| {
325                let u = rng.next_usize(n);
326                let mut v = rng.next_usize(n);
327                if v == u {
328                    v = (v + 1) % n;
329                }
330                (u, v, true)
331            })
332            .collect();
333        engine.apply_batch(&updates).expect("batch");
334        let stat =
335            pagerank(&engine.graph().to_adjacency_list(), 0.85, 2000, 1e-13).expect("static");
336        assert!(
337            approx_vec(engine.ranks(), &stat, 1e-7),
338            "batch incremental != static"
339        );
340        let s: f64 = engine.ranks().iter().sum();
341        assert!((s - 1.0).abs() < 1e-8);
342    }
343
344    #[test]
345    fn dangling_vertex_handled() {
346        // Vertex 2 is a sink (dangling): its mass redistributes uniformly.
347        let mut g = DynamicGraph::new(3);
348        g.add_edge(0, 1).expect("e");
349        g.add_edge(1, 2).expect("e");
350        let engine = IncrementalPageRank::new(g.clone(), 0.85, 1000, 1e-13).expect("new");
351        let stat = pagerank(&g.to_adjacency_list(), 0.85, 1000, 1e-13).expect("static");
352        assert!(approx_vec(engine.ranks(), &stat, 1e-9));
353    }
354
355    #[test]
356    fn rejects_bad_damping() {
357        let g = DynamicGraph::new(3);
358        assert!(IncrementalPageRank::new(g, 1.5, 10, 1e-9).is_err());
359    }
360
361    #[test]
362    fn empty_graph_ok() {
363        let g = DynamicGraph::new(0);
364        let engine = IncrementalPageRank::new(g, 0.85, 10, 1e-9).expect("new");
365        assert!(engine.ranks().is_empty());
366    }
367}