scirs2_graph/streaming/algorithms.rs
1//! Advanced streaming graph algorithms.
2//!
3//! This module provides algorithms that process massive graphs edge-by-edge
4//! without requiring the full graph to reside in memory.
5//!
6//! # Algorithms
7//!
8//! | Algorithm | Description |
9//! |-----------|-------------|
10//! | [`StreamingTriangleCounter`] | Reservoir + window exact/approximate triangle counting |
11//! | [`StreamingUnionFind`] | Online connected-components via path-compressed Union-Find |
12//! | [`streaming_bfs`] | Multi-pass BFS over an edge stream (memory-efficient) |
13//! | [`StreamingDegreeEstimator`] | Count-min sketch approximate degree queries |
14
15use std::collections::{HashMap, HashSet, VecDeque};
16
17// ────────────────────────────────────────────────────────────────────────────
18// StreamConfig
19// ────────────────────────────────────────────────────────────────────────────
20
21/// Configuration for streaming graph algorithms.
22#[derive(Debug, Clone)]
23pub struct StreamConfig {
24 /// Number of recent edges kept in the sliding window (for triangle counting).
25 pub window_size: usize,
26 /// Bit-width of hash functions in the count-min sketch (determines width `w = 2^n_sketch_bits`).
27 pub n_sketch_bits: usize,
28 /// Random seed used for hash functions.
29 pub seed: u64,
30}
31
32impl Default for StreamConfig {
33 fn default() -> Self {
34 Self {
35 window_size: 1000,
36 n_sketch_bits: 64,
37 seed: 42,
38 }
39 }
40}
41
42// ────────────────────────────────────────────────────────────────────────────
43// GraphStream
44// ────────────────────────────────────────────────────────────────────────────
45
46/// An iterator-backed stream of graph edges `(u, v)`.
47///
48/// Use [`GraphStream::from_edges`] to create a stream from a vector, or
49/// [`GraphStream::from_fn`] to wrap a lazy generator (e.g. reading a file
50/// line-by-line without loading it all).
51pub struct GraphStream {
52 inner: Box<dyn Iterator<Item = (usize, usize)>>,
53}
54
55impl GraphStream {
56 /// Create a [`GraphStream`] from an owned vector of edges.
57 pub fn from_edges(edges: Vec<(usize, usize)>) -> Self {
58 Self {
59 inner: Box::new(edges.into_iter()),
60 }
61 }
62
63 /// Create a [`GraphStream`] from a closure that produces `Some((u,v))` or `None`.
64 pub fn from_fn(mut f: impl FnMut() -> Option<(usize, usize)> + 'static) -> Self {
65 Self {
66 inner: Box::new(std::iter::from_fn(f)),
67 }
68 }
69
70 /// Advance the stream by one edge. Returns `None` when exhausted.
71 pub fn next_edge(&mut self) -> Option<(usize, usize)> {
72 self.inner.next()
73 }
74}
75
76// ────────────────────────────────────────────────────────────────────────────
77// StreamingTriangleCounter
78// ────────────────────────────────────────────────────────────────────────────
79
80/// Streaming triangle counter using a sliding adjacency window.
81///
82/// For each new edge `(u, v)` the algorithm counts common neighbours already
83/// present in the current window, giving an exact count for edges within the
84/// window and an approximate count when the stream overflows it.
85///
86/// For small graphs (total edges ≤ `window_size`) this is an exact counter.
87///
88/// # References
89/// Buriol et al., "Counting Triangles in Data Streams", PODS 2006.
90#[derive(Debug)]
91pub struct StreamingTriangleCounter {
92 /// Partial adjacency maintained for the current window.
93 adjacency_sketch: HashMap<usize, HashSet<usize>>,
94 /// Accumulated triangle estimate.
95 triangle_count: f64,
96 /// Number of edges processed so far.
97 n_edges: usize,
98 /// Sliding reservoir of recent edges (FIFO, bounded by `window_size`).
99 reservoir: VecDeque<(usize, usize)>,
100 /// Maximum window size.
101 window_size: usize,
102}
103
104impl StreamingTriangleCounter {
105 /// Create a new counter with the given configuration.
106 pub fn new(config: StreamConfig) -> Self {
107 Self {
108 adjacency_sketch: HashMap::new(),
109 triangle_count: 0.0,
110 n_edges: 0,
111 reservoir: VecDeque::new(),
112 window_size: config.window_size,
113 }
114 }
115
116 /// Process a single edge from the stream.
117 ///
118 /// New triangles formed by `(u, v)` together with any common neighbour `w`
119 /// already present in the adjacency sketch are counted. When the reservoir
120 /// is full the oldest edge is evicted and removed from the sketch.
121 pub fn process_edge(&mut self, u: usize, v: usize) {
122 self.n_edges += 1;
123
124 // Count triangles closed by this edge: |N(u) ∩ N(v)|
125 let neighbours_u: HashSet<usize> =
126 self.adjacency_sketch.get(&u).cloned().unwrap_or_default();
127 let neighbours_v: HashSet<usize> =
128 self.adjacency_sketch.get(&v).cloned().unwrap_or_default();
129
130 let common = neighbours_u.intersection(&neighbours_v).count();
131
132 // Scale factor: when the window is full we are counting on a sample of
133 // size `window_size / n_edges` of all edges, so scale up by (n/m)^2.
134 let scale = if self.n_edges <= self.window_size {
135 1.0
136 } else {
137 let m = self.window_size as f64;
138 let n = self.n_edges as f64;
139 (n / m) * (n / m)
140 };
141 self.triangle_count += common as f64 * scale;
142
143 // Add edge to sketch (undirected)
144 self.adjacency_sketch.entry(u).or_default().insert(v);
145 self.adjacency_sketch.entry(v).or_default().insert(u);
146 self.reservoir.push_back((u, v));
147
148 // Evict oldest edge when window overflows
149 if self.reservoir.len() > self.window_size {
150 if let Some((eu, ev)) = self.reservoir.pop_front() {
151 if let Some(set) = self.adjacency_sketch.get_mut(&eu) {
152 set.remove(&ev);
153 }
154 if let Some(set) = self.adjacency_sketch.get_mut(&ev) {
155 set.remove(&eu);
156 }
157 }
158 }
159 }
160
161 /// Return the current triangle estimate.
162 pub fn estimate_triangles(&self) -> f64 {
163 self.triangle_count
164 }
165
166 /// Drive the counter over an entire [`GraphStream`], returning the final estimate.
167 pub fn process_stream(&mut self, stream: &mut GraphStream) -> f64 {
168 while let Some((u, v)) = stream.next_edge() {
169 self.process_edge(u, v);
170 }
171 self.estimate_triangles()
172 }
173}
174
175// ────────────────────────────────────────────────────────────────────────────
176// StreamingBFS
177// ────────────────────────────────────────────────────────────────────────────
178
179/// Configuration for streaming BFS.
180#[derive(Debug, Clone)]
181pub struct StreamingBfsConfig {
182 /// Source vertex for BFS.
183 pub source: usize,
184 /// Maximum distance to explore (inclusive).
185 pub max_dist: usize,
186 /// Soft memory limit: maximum number of vertices to store in `visited`.
187 pub memory_limit: usize,
188}
189
190impl Default for StreamingBfsConfig {
191 fn default() -> Self {
192 Self {
193 source: 0,
194 max_dist: usize::MAX,
195 memory_limit: 10_000,
196 }
197 }
198}
199
200/// Result of a streaming BFS.
201#[derive(Debug, Clone)]
202pub struct StreamBfsResult {
203 /// Map from vertex to its shortest-path distance from the source.
204 pub distances: HashMap<usize, usize>,
205 /// Number of stream passes performed.
206 pub n_passes: usize,
207 /// Number of distinct vertices reached.
208 pub n_vertices_reached: usize,
209}
210
211/// Memory-efficient multi-pass BFS over a [`GraphStream`].
212///
213/// Because a streaming BFS cannot rewind the stream, we work over a snapshot
214/// of edges (the stream is fully consumed once and stored as an edge list).
215/// Pass `k` discovers all vertices at distance `k` from the source. Only the
216/// current frontier and the visited set need to be kept in memory.
217///
218/// # Algorithm
219/// 1. Consume the stream into an edge list (required for multi-pass).
220/// 2. Pass 0: initialise `visited = {source}`, `frontier = {source}`, `dist[source] = 0`.
221/// 3. Pass k: scan every edge; if exactly one endpoint is in the frontier and
222/// the other is unvisited, add the other to the next frontier at distance k+1.
223/// 4. Repeat until no new vertices are discovered or `max_dist` is reached.
224pub fn streaming_bfs(stream: &mut GraphStream, config: &StreamingBfsConfig) -> StreamBfsResult {
225 // Collect all edges for multi-pass traversal
226 let mut edges: Vec<(usize, usize)> = Vec::new();
227 while let Some(e) = stream.next_edge() {
228 edges.push(e);
229 }
230
231 let source = config.source;
232 let mut distances: HashMap<usize, usize> = HashMap::new();
233 distances.insert(source, 0);
234
235 let mut frontier: HashSet<usize> = HashSet::new();
236 frontier.insert(source);
237
238 let mut n_passes = 0usize;
239 let mut current_dist = 0usize;
240
241 while !frontier.is_empty() && current_dist < config.max_dist {
242 let mut next_frontier: HashSet<usize> = HashSet::new();
243 // Scan the full edge list for edges crossing from frontier to unvisited
244 for &(u, v) in &edges {
245 // Check both directions (undirected stream)
246 for &(a, b) in &[(u, v), (v, u)] {
247 if frontier.contains(&a)
248 && !distances.contains_key(&b)
249 && distances.len() < config.memory_limit
250 {
251 distances.insert(b, current_dist + 1);
252 next_frontier.insert(b);
253 }
254 }
255 }
256 n_passes += 1;
257 current_dist += 1;
258 frontier = next_frontier;
259 }
260
261 let n_vertices_reached = distances.len();
262 StreamBfsResult {
263 distances,
264 n_passes,
265 n_vertices_reached,
266 }
267}
268
269// ────────────────────────────────────────────────────────────────────────────
270// StreamingDegreeEstimator (count-min sketch)
271// ────────────────────────────────────────────────────────────────────────────
272
273/// Streaming degree estimator backed by a count-min sketch.
274///
275/// Each call to `process_edge` increments the sketch cell for both endpoints.
276/// The sketch uses `d` independent hash functions, each with width `w` determined
277/// by `n_sketch_bits` (w = min(2^n_sketch_bits, 2^16) to keep memory bounded).
278/// Degree queries return the minimum over all `d` rows.
279///
280/// # Space
281/// O(d × w) counters where d = 4 and w = 2^min(n_sketch_bits, 16).
282#[derive(Debug)]
283pub struct StreamingDegreeEstimator {
284 /// count_min[row][col] — d rows × w columns.
285 count_min: Vec<Vec<u32>>,
286 /// Number of hash functions (rows).
287 d: usize,
288 /// Width of each row.
289 w: usize,
290 /// Number of edges processed.
291 n_edges: usize,
292 /// Hash seeds, one per row.
293 seeds: Vec<u64>,
294}
295
296impl StreamingDegreeEstimator {
297 /// Create a new estimator with the given configuration.
298 pub fn new(config: StreamConfig) -> Self {
299 let d = 4usize;
300 // Cap width at 2^16 = 65536 to keep memory sensible
301 let bits = config.n_sketch_bits.min(16);
302 let w = 1usize << bits;
303
304 // Derive d different seeds from the base seed
305 let seeds: Vec<u64> = (0..d)
306 .map(|i| {
307 config
308 .seed
309 .wrapping_add((i as u64).wrapping_mul(0x9e37_79b9_7f4a_7c15))
310 })
311 .collect();
312
313 Self {
314 count_min: vec![vec![0u32; w]; d],
315 d,
316 w,
317 n_edges: 0,
318 seeds,
319 }
320 }
321
322 /// Hash a vertex id using seed `s` to a column index in `[0, w)`.
323 fn hash_vertex(&self, vertex: usize, seed: u64) -> usize {
324 // FNV-1a inspired mix
325 let mut h = seed ^ (vertex as u64).wrapping_mul(0x9e37_79b9_7f4a_7c15);
326 h ^= h >> 33;
327 h = h.wrapping_mul(0xff51_afd7_ed55_8ccd);
328 h ^= h >> 33;
329 h = h.wrapping_mul(0xc4ce_b9fe_1a85_ec53);
330 h ^= h >> 33;
331 (h as usize) % self.w
332 }
333
334 /// Process a single edge `(u, v)` — increments degree counters for both endpoints.
335 pub fn process_edge(&mut self, u: usize, v: usize) {
336 self.n_edges += 1;
337 for row in 0..self.d {
338 let seed = self.seeds[row];
339 let col_u = self.hash_vertex(u, seed);
340 let col_v = self.hash_vertex(v, seed);
341 self.count_min[row][col_u] = self.count_min[row][col_u].saturating_add(1);
342 self.count_min[row][col_v] = self.count_min[row][col_v].saturating_add(1);
343 }
344 }
345
346 /// Estimate the degree of `vertex` — returns the minimum counter across rows.
347 pub fn estimate_degree(&self, vertex: usize) -> u32 {
348 (0..self.d)
349 .map(|row| {
350 let col = self.hash_vertex(vertex, self.seeds[row]);
351 self.count_min[row][col]
352 })
353 .min()
354 .unwrap_or(0)
355 }
356
357 /// Return estimated degrees for each vertex id in `0..n_vertices`.
358 pub fn approximate_degree_distribution(&self, n_vertices: usize) -> Vec<u32> {
359 (0..n_vertices).map(|v| self.estimate_degree(v)).collect()
360 }
361}
362
363// ────────────────────────────────────────────────────────────────────────────
364// StreamingUnionFind
365// ────────────────────────────────────────────────────────────────────────────
366
367/// Online connected-components via path-compressed, union-by-rank Union-Find.
368///
369/// As edges arrive from the stream the structure is updated in O(α(n)) amortised
370/// time per edge (inverse Ackermann).
371#[derive(Debug, Default)]
372pub struct StreamingUnionFind {
373 /// parent[x] = parent of x; root if parent[x] == x.
374 parent: HashMap<usize, usize>,
375 /// rank[x] = upper bound on tree height at x.
376 rank: HashMap<usize, usize>,
377}
378
379impl StreamingUnionFind {
380 /// Create an empty Union-Find structure.
381 pub fn new() -> Self {
382 Self::default()
383 }
384
385 /// Ensure vertex `x` is initialised (self-loop, rank 0).
386 fn make_set(&mut self, x: usize) {
387 self.parent.entry(x).or_insert(x);
388 self.rank.entry(x).or_insert(0);
389 }
390
391 /// Find the representative of `x` with full path compression (iterative).
392 pub fn find(&mut self, x: usize) -> usize {
393 self.make_set(x);
394
395 // Walk up to the root
396 let mut root = x;
397 loop {
398 let p = *self.parent.get(&root).unwrap_or(&root);
399 if p == root {
400 break;
401 }
402 root = p;
403 }
404
405 // Path compression: point every node on the path directly to root
406 let mut current = x;
407 loop {
408 let p = *self.parent.get(¤t).unwrap_or(¤t);
409 if p == root {
410 break;
411 }
412 self.parent.insert(current, root);
413 current = p;
414 }
415 root
416 }
417
418 /// Process edge `(u, v)` — union the two components.
419 pub fn process_edge(&mut self, u: usize, v: usize) {
420 self.make_set(u);
421 self.make_set(v);
422 let ru = self.find(u);
423 let rv = self.find(v);
424 if ru == rv {
425 return; // already connected
426 }
427 // Union by rank
428 let rank_u = *self.rank.get(&ru).unwrap_or(&0);
429 let rank_v = *self.rank.get(&rv).unwrap_or(&0);
430 match rank_u.cmp(&rank_v) {
431 std::cmp::Ordering::Less => {
432 self.parent.insert(ru, rv);
433 }
434 std::cmp::Ordering::Greater => {
435 self.parent.insert(rv, ru);
436 }
437 std::cmp::Ordering::Equal => {
438 self.parent.insert(rv, ru);
439 self.rank.insert(ru, rank_u + 1);
440 }
441 }
442 }
443
444 /// Return the number of distinct connected components.
445 pub fn n_components(&self) -> usize {
446 self.parent.iter().filter(|(&node, &p)| node == p).count()
447 }
448
449 /// Return the canonical component identifier for `x`.
450 pub fn component_id(&mut self, x: usize) -> usize {
451 self.find(x)
452 }
453}
454
455// ────────────────────────────────────────────────────────────────────────────
456// Tests
457// ────────────────────────────────────────────────────────────────────────────
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462
463 // ── Triangle counting: K4 has exactly 4 triangles ────────────────────────
464 #[test]
465 fn test_streaming_triangle_k4() {
466 // K4 edges (6 total)
467 let edges = vec![(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)];
468 let config = StreamConfig {
469 window_size: 100,
470 ..Default::default()
471 };
472 let mut counter = StreamingTriangleCounter::new(config);
473 let mut stream = GraphStream::from_edges(edges);
474 let estimate = counter.process_stream(&mut stream);
475 // K4 has exactly 4 triangles; with window >= 6 this should be exact
476 assert!(
477 (estimate - 4.0).abs() < 1.0,
478 "Expected ~4.0 triangles, got {estimate}"
479 );
480 }
481
482 // ── StreamingUnionFind: path graph is one component ──────────────────────
483 #[test]
484 fn test_streaming_union_find_path_graph() {
485 let mut uf = StreamingUnionFind::new();
486 // Path 0-1-2-3-4
487 for i in 0..4usize {
488 uf.process_edge(i, i + 1);
489 }
490 assert_eq!(uf.n_components(), 1, "Path graph should be one component");
491 // All vertices should share the same component id
492 let c0 = uf.component_id(0);
493 for v in 1..5usize {
494 assert_eq!(uf.component_id(v), c0);
495 }
496 }
497
498 // ── StreamingUnionFind: disconnected graph has correct component count ───
499 #[test]
500 fn test_streaming_union_find_disconnected() {
501 let mut uf = StreamingUnionFind::new();
502 // Three disconnected edges: (0,1), (2,3), (4,5)
503 uf.process_edge(0, 1);
504 uf.process_edge(2, 3);
505 uf.process_edge(4, 5);
506 assert_eq!(uf.n_components(), 3);
507 }
508
509 // ── StreamingBFS: small graph ─────────────────────────────────────────────
510 #[test]
511 fn test_streaming_bfs_small_graph() {
512 // Graph: 0-1, 1-2, 2-3, 1-3
513 let edges = vec![(0, 1), (1, 2), (2, 3), (1, 3)];
514 let mut stream = GraphStream::from_edges(edges);
515 let config = StreamingBfsConfig {
516 source: 0,
517 ..Default::default()
518 };
519 let result = streaming_bfs(&mut stream, &config);
520 assert_eq!(result.distances[&0], 0);
521 assert_eq!(result.distances[&1], 1);
522 assert_eq!(result.distances[&2], 2);
523 assert_eq!(result.distances[&3], 2);
524 assert_eq!(result.n_vertices_reached, 4);
525 }
526
527 // ── StreamingBFS: single source, star graph ───────────────────────────────
528 #[test]
529 fn test_streaming_bfs_star() {
530 // Star: center 0, leaves 1..=5
531 let edges: Vec<(usize, usize)> = (1..=5).map(|i| (0, i)).collect();
532 let mut stream = GraphStream::from_edges(edges);
533 let config = StreamingBfsConfig {
534 source: 0,
535 ..Default::default()
536 };
537 let result = streaming_bfs(&mut stream, &config);
538 assert_eq!(result.distances[&0], 0);
539 for leaf in 1..=5usize {
540 assert_eq!(result.distances[&leaf], 1);
541 }
542 }
543
544 // ── DegreeEstimator: known graph degrees ─────────────────────────────────
545 #[test]
546 fn test_degree_estimator_known_degrees() {
547 // Build a graph where vertex 0 has degree 4
548 // Edges: 0-1, 0-2, 0-3, 0-4
549 let edges = vec![(0, 1), (0, 2), (0, 3), (0, 4)];
550 let config = StreamConfig {
551 n_sketch_bits: 8,
552 ..Default::default()
553 };
554 let mut estimator = StreamingDegreeEstimator::new(config);
555 for (u, v) in &edges {
556 estimator.process_edge(*u, *v);
557 }
558 let est_deg_0 = estimator.estimate_degree(0);
559 // True degree is 4; allow 2x error as per spec
560 assert!(
561 est_deg_0 >= 2,
562 "Degree estimate for vertex 0 should be >= 2 (true=4), got {est_deg_0}"
563 );
564 // Leaves have degree 1
565 for v in 1..=4usize {
566 let est = estimator.estimate_degree(v);
567 assert!(
568 est >= 1,
569 "Degree estimate for leaf {v} should be >= 1, got {est}"
570 );
571 }
572 }
573
574 // ── DegreeEstimator: degree distribution ─────────────────────────────────
575 #[test]
576 fn test_degree_estimator_distribution() {
577 // Path 0-1-2-3-4: degrees are 1,2,2,2,1
578 let edges = vec![(0, 1), (1, 2), (2, 3), (3, 4)];
579 let config = StreamConfig {
580 n_sketch_bits: 8,
581 ..Default::default()
582 };
583 let mut estimator = StreamingDegreeEstimator::new(config);
584 for (u, v) in &edges {
585 estimator.process_edge(*u, *v);
586 }
587 let dist = estimator.approximate_degree_distribution(5);
588 // All estimates should be ≥ 1 and ≤ 4 (2× the max true degree of 2)
589 for (v, &est) in dist.iter().enumerate() {
590 assert!(est >= 1, "Vertex {v} degree estimate {est} should be >= 1");
591 assert!(est <= 8, "Vertex {v} degree estimate {est} should be <= 8");
592 }
593 }
594
595 // ── GraphStream from_fn ───────────────────────────────────────────────────
596 #[test]
597 fn test_graph_stream_from_fn() {
598 let data = vec![(0usize, 1usize), (1, 2)];
599 let mut iter = data.into_iter();
600 let mut stream = GraphStream::from_fn(move || iter.next());
601 assert_eq!(stream.next_edge(), Some((0, 1)));
602 assert_eq!(stream.next_edge(), Some((1, 2)));
603 assert_eq!(stream.next_edge(), None);
604 }
605}