Skip to main content

scirs2_graph/streaming/
algorithms.rs

1//! Advanced streaming graph algorithms.
2//!
3//! This module provides algorithms that process massive graphs edge-by-edge
4//! without requiring the full graph to reside in memory.
5//!
6//! # Algorithms
7//!
8//! | Algorithm | Description |
9//! |-----------|-------------|
10//! | [`StreamingTriangleCounter`] | Reservoir + window exact/approximate triangle counting |
11//! | [`StreamingUnionFind`] | Online connected-components via path-compressed Union-Find |
12//! | [`streaming_bfs`] | Multi-pass BFS over an edge stream (memory-efficient) |
13//! | [`StreamingDegreeEstimator`] | Count-min sketch approximate degree queries |
14
15use std::collections::{HashMap, HashSet, VecDeque};
16
17// ────────────────────────────────────────────────────────────────────────────
18// StreamConfig
19// ────────────────────────────────────────────────────────────────────────────
20
21/// Configuration for streaming graph algorithms.
22#[derive(Debug, Clone)]
23pub struct StreamConfig {
24    /// Number of recent edges kept in the sliding window (for triangle counting).
25    pub window_size: usize,
26    /// Bit-width of hash functions in the count-min sketch (determines width `w = 2^n_sketch_bits`).
27    pub n_sketch_bits: usize,
28    /// Random seed used for hash functions.
29    pub seed: u64,
30}
31
32impl Default for StreamConfig {
33    fn default() -> Self {
34        Self {
35            window_size: 1000,
36            n_sketch_bits: 64,
37            seed: 42,
38        }
39    }
40}
41
42// ────────────────────────────────────────────────────────────────────────────
43// GraphStream
44// ────────────────────────────────────────────────────────────────────────────
45
46/// An iterator-backed stream of graph edges `(u, v)`.
47///
48/// Use [`GraphStream::from_edges`] to create a stream from a vector, or
49/// [`GraphStream::from_fn`] to wrap a lazy generator (e.g. reading a file
50/// line-by-line without loading it all).
51pub struct GraphStream {
52    inner: Box<dyn Iterator<Item = (usize, usize)>>,
53}
54
55impl GraphStream {
56    /// Create a [`GraphStream`] from an owned vector of edges.
57    pub fn from_edges(edges: Vec<(usize, usize)>) -> Self {
58        Self {
59            inner: Box::new(edges.into_iter()),
60        }
61    }
62
63    /// Create a [`GraphStream`] from a closure that produces `Some((u,v))` or `None`.
64    pub fn from_fn(mut f: impl FnMut() -> Option<(usize, usize)> + 'static) -> Self {
65        Self {
66            inner: Box::new(std::iter::from_fn(f)),
67        }
68    }
69
70    /// Advance the stream by one edge.  Returns `None` when exhausted.
71    pub fn next_edge(&mut self) -> Option<(usize, usize)> {
72        self.inner.next()
73    }
74}
75
76// ────────────────────────────────────────────────────────────────────────────
77// StreamingTriangleCounter
78// ────────────────────────────────────────────────────────────────────────────
79
80/// Streaming triangle counter using a sliding adjacency window.
81///
82/// For each new edge `(u, v)` the algorithm counts common neighbours already
83/// present in the current window, giving an exact count for edges within the
84/// window and an approximate count when the stream overflows it.
85///
86/// For small graphs (total edges ≤ `window_size`) this is an exact counter.
87///
88/// # References
89/// Buriol et al., "Counting Triangles in Data Streams", PODS 2006.
90#[derive(Debug)]
91pub struct StreamingTriangleCounter {
92    /// Partial adjacency maintained for the current window.
93    adjacency_sketch: HashMap<usize, HashSet<usize>>,
94    /// Accumulated triangle estimate.
95    triangle_count: f64,
96    /// Number of edges processed so far.
97    n_edges: usize,
98    /// Sliding reservoir of recent edges (FIFO, bounded by `window_size`).
99    reservoir: VecDeque<(usize, usize)>,
100    /// Maximum window size.
101    window_size: usize,
102}
103
104impl StreamingTriangleCounter {
105    /// Create a new counter with the given configuration.
106    pub fn new(config: StreamConfig) -> Self {
107        Self {
108            adjacency_sketch: HashMap::new(),
109            triangle_count: 0.0,
110            n_edges: 0,
111            reservoir: VecDeque::new(),
112            window_size: config.window_size,
113        }
114    }
115
116    /// Process a single edge from the stream.
117    ///
118    /// New triangles formed by `(u, v)` together with any common neighbour `w`
119    /// already present in the adjacency sketch are counted.  When the reservoir
120    /// is full the oldest edge is evicted and removed from the sketch.
121    pub fn process_edge(&mut self, u: usize, v: usize) {
122        self.n_edges += 1;
123
124        // Count triangles closed by this edge: |N(u) ∩ N(v)|
125        let neighbours_u: HashSet<usize> =
126            self.adjacency_sketch.get(&u).cloned().unwrap_or_default();
127        let neighbours_v: HashSet<usize> =
128            self.adjacency_sketch.get(&v).cloned().unwrap_or_default();
129
130        let common = neighbours_u.intersection(&neighbours_v).count();
131
132        // Scale factor: when the window is full we are counting on a sample of
133        // size `window_size / n_edges` of all edges, so scale up by (n/m)^2.
134        let scale = if self.n_edges <= self.window_size {
135            1.0
136        } else {
137            let m = self.window_size as f64;
138            let n = self.n_edges as f64;
139            (n / m) * (n / m)
140        };
141        self.triangle_count += common as f64 * scale;
142
143        // Add edge to sketch (undirected)
144        self.adjacency_sketch.entry(u).or_default().insert(v);
145        self.adjacency_sketch.entry(v).or_default().insert(u);
146        self.reservoir.push_back((u, v));
147
148        // Evict oldest edge when window overflows
149        if self.reservoir.len() > self.window_size {
150            if let Some((eu, ev)) = self.reservoir.pop_front() {
151                if let Some(set) = self.adjacency_sketch.get_mut(&eu) {
152                    set.remove(&ev);
153                }
154                if let Some(set) = self.adjacency_sketch.get_mut(&ev) {
155                    set.remove(&eu);
156                }
157            }
158        }
159    }
160
161    /// Return the current triangle estimate.
162    pub fn estimate_triangles(&self) -> f64 {
163        self.triangle_count
164    }
165
166    /// Drive the counter over an entire [`GraphStream`], returning the final estimate.
167    pub fn process_stream(&mut self, stream: &mut GraphStream) -> f64 {
168        while let Some((u, v)) = stream.next_edge() {
169            self.process_edge(u, v);
170        }
171        self.estimate_triangles()
172    }
173}
174
175// ────────────────────────────────────────────────────────────────────────────
176// StreamingBFS
177// ────────────────────────────────────────────────────────────────────────────
178
179/// Configuration for streaming BFS.
180#[derive(Debug, Clone)]
181pub struct StreamingBfsConfig {
182    /// Source vertex for BFS.
183    pub source: usize,
184    /// Maximum distance to explore (inclusive).
185    pub max_dist: usize,
186    /// Soft memory limit: maximum number of vertices to store in `visited`.
187    pub memory_limit: usize,
188}
189
190impl Default for StreamingBfsConfig {
191    fn default() -> Self {
192        Self {
193            source: 0,
194            max_dist: usize::MAX,
195            memory_limit: 10_000,
196        }
197    }
198}
199
200/// Result of a streaming BFS.
201#[derive(Debug, Clone)]
202pub struct StreamBfsResult {
203    /// Map from vertex to its shortest-path distance from the source.
204    pub distances: HashMap<usize, usize>,
205    /// Number of stream passes performed.
206    pub n_passes: usize,
207    /// Number of distinct vertices reached.
208    pub n_vertices_reached: usize,
209}
210
211/// Memory-efficient multi-pass BFS over a [`GraphStream`].
212///
213/// Because a streaming BFS cannot rewind the stream, we work over a snapshot
214/// of edges (the stream is fully consumed once and stored as an edge list).
215/// Pass `k` discovers all vertices at distance `k` from the source.  Only the
216/// current frontier and the visited set need to be kept in memory.
217///
218/// # Algorithm
219/// 1. Consume the stream into an edge list (required for multi-pass).
220/// 2. Pass 0: initialise `visited = {source}`, `frontier = {source}`, `dist[source] = 0`.
221/// 3. Pass k: scan every edge; if exactly one endpoint is in the frontier and
222///    the other is unvisited, add the other to the next frontier at distance k+1.
223/// 4. Repeat until no new vertices are discovered or `max_dist` is reached.
224pub fn streaming_bfs(stream: &mut GraphStream, config: &StreamingBfsConfig) -> StreamBfsResult {
225    // Collect all edges for multi-pass traversal
226    let mut edges: Vec<(usize, usize)> = Vec::new();
227    while let Some(e) = stream.next_edge() {
228        edges.push(e);
229    }
230
231    let source = config.source;
232    let mut distances: HashMap<usize, usize> = HashMap::new();
233    distances.insert(source, 0);
234
235    let mut frontier: HashSet<usize> = HashSet::new();
236    frontier.insert(source);
237
238    let mut n_passes = 0usize;
239    let mut current_dist = 0usize;
240
241    while !frontier.is_empty() && current_dist < config.max_dist {
242        let mut next_frontier: HashSet<usize> = HashSet::new();
243        // Scan the full edge list for edges crossing from frontier to unvisited
244        for &(u, v) in &edges {
245            // Check both directions (undirected stream)
246            for &(a, b) in &[(u, v), (v, u)] {
247                if frontier.contains(&a)
248                    && !distances.contains_key(&b)
249                    && distances.len() < config.memory_limit
250                {
251                    distances.insert(b, current_dist + 1);
252                    next_frontier.insert(b);
253                }
254            }
255        }
256        n_passes += 1;
257        current_dist += 1;
258        frontier = next_frontier;
259    }
260
261    let n_vertices_reached = distances.len();
262    StreamBfsResult {
263        distances,
264        n_passes,
265        n_vertices_reached,
266    }
267}
268
269// ────────────────────────────────────────────────────────────────────────────
270// StreamingDegreeEstimator (count-min sketch)
271// ────────────────────────────────────────────────────────────────────────────
272
273/// Streaming degree estimator backed by a count-min sketch.
274///
275/// Each call to `process_edge` increments the sketch cell for both endpoints.
276/// The sketch uses `d` independent hash functions, each with width `w` determined
277/// by `n_sketch_bits` (w = min(2^n_sketch_bits, 2^16) to keep memory bounded).
278/// Degree queries return the minimum over all `d` rows.
279///
280/// # Space
281/// O(d × w) counters where d = 4 and w = 2^min(n_sketch_bits, 16).
282#[derive(Debug)]
283pub struct StreamingDegreeEstimator {
284    /// count_min[row][col] — d rows × w columns.
285    count_min: Vec<Vec<u32>>,
286    /// Number of hash functions (rows).
287    d: usize,
288    /// Width of each row.
289    w: usize,
290    /// Number of edges processed.
291    n_edges: usize,
292    /// Hash seeds, one per row.
293    seeds: Vec<u64>,
294}
295
296impl StreamingDegreeEstimator {
297    /// Create a new estimator with the given configuration.
298    pub fn new(config: StreamConfig) -> Self {
299        let d = 4usize;
300        // Cap width at 2^16 = 65536 to keep memory sensible
301        let bits = config.n_sketch_bits.min(16);
302        let w = 1usize << bits;
303
304        // Derive d different seeds from the base seed
305        let seeds: Vec<u64> = (0..d)
306            .map(|i| {
307                config
308                    .seed
309                    .wrapping_add((i as u64).wrapping_mul(0x9e37_79b9_7f4a_7c15))
310            })
311            .collect();
312
313        Self {
314            count_min: vec![vec![0u32; w]; d],
315            d,
316            w,
317            n_edges: 0,
318            seeds,
319        }
320    }
321
322    /// Hash a vertex id using seed `s` to a column index in `[0, w)`.
323    fn hash_vertex(&self, vertex: usize, seed: u64) -> usize {
324        // FNV-1a inspired mix
325        let mut h = seed ^ (vertex as u64).wrapping_mul(0x9e37_79b9_7f4a_7c15);
326        h ^= h >> 33;
327        h = h.wrapping_mul(0xff51_afd7_ed55_8ccd);
328        h ^= h >> 33;
329        h = h.wrapping_mul(0xc4ce_b9fe_1a85_ec53);
330        h ^= h >> 33;
331        (h as usize) % self.w
332    }
333
334    /// Process a single edge `(u, v)` — increments degree counters for both endpoints.
335    pub fn process_edge(&mut self, u: usize, v: usize) {
336        self.n_edges += 1;
337        for row in 0..self.d {
338            let seed = self.seeds[row];
339            let col_u = self.hash_vertex(u, seed);
340            let col_v = self.hash_vertex(v, seed);
341            self.count_min[row][col_u] = self.count_min[row][col_u].saturating_add(1);
342            self.count_min[row][col_v] = self.count_min[row][col_v].saturating_add(1);
343        }
344    }
345
346    /// Estimate the degree of `vertex` — returns the minimum counter across rows.
347    pub fn estimate_degree(&self, vertex: usize) -> u32 {
348        (0..self.d)
349            .map(|row| {
350                let col = self.hash_vertex(vertex, self.seeds[row]);
351                self.count_min[row][col]
352            })
353            .min()
354            .unwrap_or(0)
355    }
356
357    /// Return estimated degrees for each vertex id in `0..n_vertices`.
358    pub fn approximate_degree_distribution(&self, n_vertices: usize) -> Vec<u32> {
359        (0..n_vertices).map(|v| self.estimate_degree(v)).collect()
360    }
361}
362
363// ────────────────────────────────────────────────────────────────────────────
364// StreamingUnionFind
365// ────────────────────────────────────────────────────────────────────────────
366
367/// Online connected-components via path-compressed, union-by-rank Union-Find.
368///
369/// As edges arrive from the stream the structure is updated in O(α(n)) amortised
370/// time per edge (inverse Ackermann).
371#[derive(Debug, Default)]
372pub struct StreamingUnionFind {
373    /// parent[x] = parent of x; root if parent[x] == x.
374    parent: HashMap<usize, usize>,
375    /// rank[x] = upper bound on tree height at x.
376    rank: HashMap<usize, usize>,
377}
378
379impl StreamingUnionFind {
380    /// Create an empty Union-Find structure.
381    pub fn new() -> Self {
382        Self::default()
383    }
384
385    /// Ensure vertex `x` is initialised (self-loop, rank 0).
386    fn make_set(&mut self, x: usize) {
387        self.parent.entry(x).or_insert(x);
388        self.rank.entry(x).or_insert(0);
389    }
390
391    /// Find the representative of `x` with full path compression (iterative).
392    pub fn find(&mut self, x: usize) -> usize {
393        self.make_set(x);
394
395        // Walk up to the root
396        let mut root = x;
397        loop {
398            let p = *self.parent.get(&root).unwrap_or(&root);
399            if p == root {
400                break;
401            }
402            root = p;
403        }
404
405        // Path compression: point every node on the path directly to root
406        let mut current = x;
407        loop {
408            let p = *self.parent.get(&current).unwrap_or(&current);
409            if p == root {
410                break;
411            }
412            self.parent.insert(current, root);
413            current = p;
414        }
415        root
416    }
417
418    /// Process edge `(u, v)` — union the two components.
419    pub fn process_edge(&mut self, u: usize, v: usize) {
420        self.make_set(u);
421        self.make_set(v);
422        let ru = self.find(u);
423        let rv = self.find(v);
424        if ru == rv {
425            return; // already connected
426        }
427        // Union by rank
428        let rank_u = *self.rank.get(&ru).unwrap_or(&0);
429        let rank_v = *self.rank.get(&rv).unwrap_or(&0);
430        match rank_u.cmp(&rank_v) {
431            std::cmp::Ordering::Less => {
432                self.parent.insert(ru, rv);
433            }
434            std::cmp::Ordering::Greater => {
435                self.parent.insert(rv, ru);
436            }
437            std::cmp::Ordering::Equal => {
438                self.parent.insert(rv, ru);
439                self.rank.insert(ru, rank_u + 1);
440            }
441        }
442    }
443
444    /// Return the number of distinct connected components.
445    pub fn n_components(&self) -> usize {
446        self.parent.iter().filter(|(&node, &p)| node == p).count()
447    }
448
449    /// Return the canonical component identifier for `x`.
450    pub fn component_id(&mut self, x: usize) -> usize {
451        self.find(x)
452    }
453}
454
455// ────────────────────────────────────────────────────────────────────────────
456// Tests
457// ────────────────────────────────────────────────────────────────────────────
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462
463    // ── Triangle counting: K4 has exactly 4 triangles ────────────────────────
464    #[test]
465    fn test_streaming_triangle_k4() {
466        // K4 edges (6 total)
467        let edges = vec![(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)];
468        let config = StreamConfig {
469            window_size: 100,
470            ..Default::default()
471        };
472        let mut counter = StreamingTriangleCounter::new(config);
473        let mut stream = GraphStream::from_edges(edges);
474        let estimate = counter.process_stream(&mut stream);
475        // K4 has exactly 4 triangles; with window >= 6 this should be exact
476        assert!(
477            (estimate - 4.0).abs() < 1.0,
478            "Expected ~4.0 triangles, got {estimate}"
479        );
480    }
481
482    // ── StreamingUnionFind: path graph is one component ──────────────────────
483    #[test]
484    fn test_streaming_union_find_path_graph() {
485        let mut uf = StreamingUnionFind::new();
486        // Path 0-1-2-3-4
487        for i in 0..4usize {
488            uf.process_edge(i, i + 1);
489        }
490        assert_eq!(uf.n_components(), 1, "Path graph should be one component");
491        // All vertices should share the same component id
492        let c0 = uf.component_id(0);
493        for v in 1..5usize {
494            assert_eq!(uf.component_id(v), c0);
495        }
496    }
497
498    // ── StreamingUnionFind: disconnected graph has correct component count ───
499    #[test]
500    fn test_streaming_union_find_disconnected() {
501        let mut uf = StreamingUnionFind::new();
502        // Three disconnected edges: (0,1), (2,3), (4,5)
503        uf.process_edge(0, 1);
504        uf.process_edge(2, 3);
505        uf.process_edge(4, 5);
506        assert_eq!(uf.n_components(), 3);
507    }
508
509    // ── StreamingBFS: small graph ─────────────────────────────────────────────
510    #[test]
511    fn test_streaming_bfs_small_graph() {
512        // Graph: 0-1, 1-2, 2-3, 1-3
513        let edges = vec![(0, 1), (1, 2), (2, 3), (1, 3)];
514        let mut stream = GraphStream::from_edges(edges);
515        let config = StreamingBfsConfig {
516            source: 0,
517            ..Default::default()
518        };
519        let result = streaming_bfs(&mut stream, &config);
520        assert_eq!(result.distances[&0], 0);
521        assert_eq!(result.distances[&1], 1);
522        assert_eq!(result.distances[&2], 2);
523        assert_eq!(result.distances[&3], 2);
524        assert_eq!(result.n_vertices_reached, 4);
525    }
526
527    // ── StreamingBFS: single source, star graph ───────────────────────────────
528    #[test]
529    fn test_streaming_bfs_star() {
530        // Star: center 0, leaves 1..=5
531        let edges: Vec<(usize, usize)> = (1..=5).map(|i| (0, i)).collect();
532        let mut stream = GraphStream::from_edges(edges);
533        let config = StreamingBfsConfig {
534            source: 0,
535            ..Default::default()
536        };
537        let result = streaming_bfs(&mut stream, &config);
538        assert_eq!(result.distances[&0], 0);
539        for leaf in 1..=5usize {
540            assert_eq!(result.distances[&leaf], 1);
541        }
542    }
543
544    // ── DegreeEstimator: known graph degrees ─────────────────────────────────
545    #[test]
546    fn test_degree_estimator_known_degrees() {
547        // Build a graph where vertex 0 has degree 4
548        // Edges: 0-1, 0-2, 0-3, 0-4
549        let edges = vec![(0, 1), (0, 2), (0, 3), (0, 4)];
550        let config = StreamConfig {
551            n_sketch_bits: 8,
552            ..Default::default()
553        };
554        let mut estimator = StreamingDegreeEstimator::new(config);
555        for (u, v) in &edges {
556            estimator.process_edge(*u, *v);
557        }
558        let est_deg_0 = estimator.estimate_degree(0);
559        // True degree is 4; allow 2x error as per spec
560        assert!(
561            est_deg_0 >= 2,
562            "Degree estimate for vertex 0 should be >= 2 (true=4), got {est_deg_0}"
563        );
564        // Leaves have degree 1
565        for v in 1..=4usize {
566            let est = estimator.estimate_degree(v);
567            assert!(
568                est >= 1,
569                "Degree estimate for leaf {v} should be >= 1, got {est}"
570            );
571        }
572    }
573
574    // ── DegreeEstimator: degree distribution ─────────────────────────────────
575    #[test]
576    fn test_degree_estimator_distribution() {
577        // Path 0-1-2-3-4: degrees are 1,2,2,2,1
578        let edges = vec![(0, 1), (1, 2), (2, 3), (3, 4)];
579        let config = StreamConfig {
580            n_sketch_bits: 8,
581            ..Default::default()
582        };
583        let mut estimator = StreamingDegreeEstimator::new(config);
584        for (u, v) in &edges {
585            estimator.process_edge(*u, *v);
586        }
587        let dist = estimator.approximate_degree_distribution(5);
588        // All estimates should be ≥ 1 and ≤ 4 (2× the max true degree of 2)
589        for (v, &est) in dist.iter().enumerate() {
590            assert!(est >= 1, "Vertex {v} degree estimate {est} should be >= 1");
591            assert!(est <= 8, "Vertex {v} degree estimate {est} should be <= 8");
592        }
593    }
594
595    // ── GraphStream from_fn ───────────────────────────────────────────────────
596    #[test]
597    fn test_graph_stream_from_fn() {
598        let data = vec![(0usize, 1usize), (1, 2)];
599        let mut iter = data.into_iter();
600        let mut stream = GraphStream::from_fn(move || iter.next());
601        assert_eq!(stream.next_edge(), Some((0, 1)));
602        assert_eq!(stream.next_edge(), Some((1, 2)));
603        assert_eq!(stream.next_edge(), None);
604    }
605}