Skip to main content

scirs2_graph/distributed/
mod.rs

1//! Distributed graph storage with partitioned adjacency.
2//!
3//! This module provides data structures and algorithms for partitioning large
4//! graphs across multiple logical shards, enabling distributed-memory graph
5//! processing without a single monolithic adjacency representation.
6//!
7//! # Overview
8//!
9//! A [`DistributedGraph`] splits vertices across [`GraphShard`]s according to
10//! a chosen [`GraphPartitionMethod`].  Cross-shard edges create *mirror*
11//! vertices — lightweight replicas used to route messages during distributed
12//! algorithms without accessing the remote shard.
13//!
14//! | Method | Description |
15//! |--------|-------------|
16//! | [`GraphPartitionMethod::HashBased`] | `vertex % n_partitions` (round-robin, zero overhead) |
17//! | [`GraphPartitionMethod::EdgeCut`] | Same as hash; edges of a vertex live on its shard |
18//! | [`GraphPartitionMethod::VertexCut`] | Edge lives on the shard of the lower-degree endpoint |
19//! | [`GraphPartitionMethod::Fennel`] | Streaming FENNEL greedy assignment (Tsourakakis 2014) |
20//!
21//! # Example
22//!
23//! ```rust
24//! use scirs2_graph::distributed::{build_distributed_graph, DistributedGraphConfig, distributed_degree};
25//!
26//! let edges: Vec<(usize, usize)> = (0..5).flat_map(|i| (i+1..5).map(move |j| (i, j))).collect();
27//! let cfg = DistributedGraphConfig::default();
28//! let dg = build_distributed_graph(&edges, 5, &cfg);
29//! assert!(distributed_degree(&dg, 0).is_some());
30//! ```
31
32use std::collections::HashMap;
33
34// ────────────────────────────────────────────────────────────────────────────
35// Public types
36// ────────────────────────────────────────────────────────────────────────────
37
38/// Method used to partition a graph's vertices across shards.
39#[non_exhaustive]
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub enum GraphPartitionMethod {
42    /// Hash-based: `vertex_id % n_partitions` (deterministic, zero overhead).
43    HashBased,
44    /// Edge-cut: assign each edge to the source vertex's shard.
45    EdgeCut,
46    /// Vertex-cut: assign each edge to the shard of the lower-degree endpoint.
47    VertexCut,
48    /// Streaming FENNEL (Tsourakakis et al. 2014): maximise local density while
49    /// penalising shard imbalance.
50    Fennel,
51}
52
53/// Configuration for [`build_distributed_graph`].
54#[derive(Debug, Clone)]
55pub struct DistributedGraphConfig {
56    /// Number of logical shards/partitions.
57    pub n_partitions: usize,
58    /// Partitioning algorithm to use.
59    pub partition_method: GraphPartitionMethod,
60    /// Extra replicas per vertex (0 = no replication).
61    pub replication_factor: usize,
62}
63
64impl Default for DistributedGraphConfig {
65    fn default() -> Self {
66        Self {
67            n_partitions: 4,
68            partition_method: GraphPartitionMethod::HashBased,
69            replication_factor: 0,
70        }
71    }
72}
73
74/// Location of a vertex in the distributed graph.
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub struct VertexLocation {
77    /// Shard (partition) that owns this vertex.
78    pub partition_id: usize,
79    /// Local index within that shard's vertex list.
80    pub local_id: usize,
81}
82
83/// A single partition (shard) of the distributed graph.
84#[derive(Debug, Clone, Default)]
85pub struct GraphShard {
86    /// Global vertex IDs owned by this shard.
87    pub local_vertices: Vec<usize>,
88    /// Edges `(src, dst)` (both global IDs) assigned to this shard.
89    pub local_edges: Vec<(usize, usize)>,
90    /// Mirror vertices: remote vertices referenced by local edges.
91    pub mirror_vertices: Vec<usize>,
92}
93
94/// Distributed graph: a collection of shards with a global vertex map.
95#[derive(Debug, Clone)]
96pub struct DistributedGraph {
97    /// One shard per partition.
98    pub shards: Vec<GraphShard>,
99    /// Total number of vertices in the original graph.
100    pub n_global_vertices: usize,
101    /// Total number of edges in the original graph.
102    pub n_global_edges: usize,
103    /// O(1) lookup: global vertex id → partition + local index.
104    pub vertex_map: HashMap<usize, VertexLocation>,
105}
106
107/// Summary statistics for a distributed graph.
108#[derive(Debug, Clone)]
109pub struct DistributedStats {
110    /// `max_shard_size / avg_shard_size` (1.0 = perfectly balanced).
111    pub balance_ratio: f64,
112    /// Fraction of edges that cross partition boundaries.
113    pub edge_cut_fraction: f64,
114    /// Average number of shards a vertex is present in (including mirrors).
115    pub replication_factor: f64,
116    /// Number of vertices in each shard (owned, not mirror).
117    pub shard_sizes: Vec<usize>,
118}
119
120// ────────────────────────────────────────────────────────────────────────────
121// Partitioning helpers
122// ────────────────────────────────────────────────────────────────────────────
123
124/// Hash-partition: assign vertex to `vertex % n_partitions`.
125#[inline]
126pub fn hash_partition(vertex: usize, n_partitions: usize) -> usize {
127    if n_partitions == 0 {
128        return 0;
129    }
130    vertex % n_partitions
131}
132
133/// Streaming FENNEL partition assignment (Tsourakakis et al. 2014).
134///
135/// Processes edges in stream order and assigns each unassigned vertex to the
136/// partition that maximises:
137///   `score(p) = |N(v) ∩ V_p| − α × |V_p|^γ`
138/// where `α = sqrt(|E| / n_partitions^γ)` and `γ = 3/2`.
139///
140/// # Returns
141/// A `Vec` of length `n_vertices` where entry `v` is the partition for vertex `v`.
142pub fn fennel_partition(
143    edges: &[(usize, usize)],
144    n_vertices: usize,
145    n_partitions: usize,
146    config: &DistributedGraphConfig,
147) -> Vec<usize> {
148    if n_partitions == 0 || n_vertices == 0 {
149        return vec![0; n_vertices];
150    }
151
152    let n_edges = edges.len() as f64;
153    let gamma = 1.5_f64;
154    // α = sqrt(|E| / k^γ) where k = n_partitions
155    let alpha = (n_edges / (n_partitions as f64).powf(gamma)).sqrt();
156
157    // assignment[v] = Some(partition) once decided
158    let mut assignment: Vec<Option<usize>> = vec![None; n_vertices];
159    // |V_p| for each partition
160    let mut partition_sizes: Vec<f64> = vec![0.0; n_partitions];
161    // adjacency list built incrementally for N(v) ∩ V_p queries
162    let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n_vertices];
163
164    let _ = config; // config is passed for forward-compatibility
165
166    for &(u, v) in edges {
167        // Update adjacency (undirected)
168        if u < n_vertices && v < n_vertices {
169            adj[u].push(v);
170            adj[v].push(u);
171        }
172
173        // Assign u if not yet assigned
174        for &vertex in &[u, v] {
175            if vertex >= n_vertices || assignment[vertex].is_some() {
176                continue;
177            }
178            // Count neighbours already assigned to each partition
179            let mut neighbour_counts: Vec<f64> = vec![0.0; n_partitions];
180            for &nb in &adj[vertex] {
181                if nb < n_vertices {
182                    if let Some(p) = assignment[nb] {
183                        neighbour_counts[p] += 1.0;
184                    }
185                }
186            }
187            // Pick the partition with the highest score
188            let best = (0..n_partitions)
189                .map(|p| {
190                    let sz = partition_sizes[p];
191                    let score = neighbour_counts[p] - alpha * sz.powf(gamma);
192                    (p, score)
193                })
194                .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
195                .map(|(p, _)| p)
196                .unwrap_or(0);
197
198            assignment[vertex] = Some(best);
199            partition_sizes[best] += 1.0;
200        }
201    }
202
203    // Any vertex not touched by any edge falls back to hash
204    assignment
205        .iter()
206        .enumerate()
207        .map(|(v, a)| a.unwrap_or_else(|| hash_partition(v, n_partitions)))
208        .collect()
209}
210
211// ────────────────────────────────────────────────────────────────────────────
212// build_distributed_graph
213// ────────────────────────────────────────────────────────────────────────────
214
215/// Build a [`DistributedGraph`] from an edge list.
216///
217/// Steps:
218/// 1. Assign each vertex to a partition using the configured method.
219/// 2. Assign each edge to a shard:
220///    - `EdgeCut` / `HashBased`: edge goes to the source vertex's shard.
221///    - `VertexCut`: edge goes to the shard of the endpoint with fewer edges.
222///    - `Fennel`: same as `EdgeCut` after FENNEL vertex assignment.
223/// 3. Record mirror vertices for cross-shard edges.
224/// 4. Build `vertex_map` for O(1) location queries.
225pub fn build_distributed_graph(
226    edges: &[(usize, usize)],
227    n_vertices: usize,
228    config: &DistributedGraphConfig,
229) -> DistributedGraph {
230    let n_partitions = config.n_partitions.max(1);
231
232    // Step 1: compute vertex → partition assignment
233    let vertex_partition: Vec<usize> = match config.partition_method {
234        GraphPartitionMethod::HashBased | GraphPartitionMethod::EdgeCut => (0..n_vertices)
235            .map(|v| hash_partition(v, n_partitions))
236            .collect(),
237        GraphPartitionMethod::VertexCut => {
238            // For VertexCut we still need to know partition of each vertex for
239            // edge routing, but we route by degree heuristic at edge time.
240            (0..n_vertices)
241                .map(|v| hash_partition(v, n_partitions))
242                .collect()
243        }
244        GraphPartitionMethod::Fennel => fennel_partition(edges, n_vertices, n_partitions, config),
245    };
246
247    // Pre-compute degree counts for VertexCut
248    let mut degree: Vec<usize> = vec![0usize; n_vertices];
249    for &(u, v) in edges {
250        if u < n_vertices {
251            degree[u] += 1;
252        }
253        if v < n_vertices {
254            degree[v] += 1;
255        }
256    }
257
258    // Step 2: initialise shards
259    let mut shards: Vec<GraphShard> = (0..n_partitions).map(|_| GraphShard::default()).collect();
260
261    // Assign vertices to shards
262    let mut vertex_map: HashMap<usize, VertexLocation> = HashMap::with_capacity(n_vertices);
263    for v in 0..n_vertices {
264        let pid = vertex_partition[v];
265        let pid = pid.min(n_partitions - 1);
266        let local_id = shards[pid].local_vertices.len();
267        shards[pid].local_vertices.push(v);
268        vertex_map.insert(
269            v,
270            VertexLocation {
271                partition_id: pid,
272                local_id,
273            },
274        );
275    }
276
277    // Step 3: assign edges and record mirrors
278    // We use a set per shard to avoid duplicate mirror entries
279    let mut mirror_sets: Vec<std::collections::HashSet<usize>> = (0..n_partitions)
280        .map(|_| std::collections::HashSet::new())
281        .collect();
282
283    for &(u, v) in edges {
284        if u >= n_vertices || v >= n_vertices {
285            continue;
286        }
287        let shard_idx = match config.partition_method {
288            GraphPartitionMethod::VertexCut => {
289                // Edge goes to the shard of the lower-degree endpoint
290                if degree[u] <= degree[v] {
291                    vertex_partition[u].min(n_partitions - 1)
292                } else {
293                    vertex_partition[v].min(n_partitions - 1)
294                }
295            }
296            _ => {
297                // EdgeCut / HashBased / Fennel: edge belongs to source's shard
298                vertex_partition[u].min(n_partitions - 1)
299            }
300        };
301
302        shards[shard_idx].local_edges.push((u, v));
303
304        // Record mirrors for cross-shard endpoints
305        let owner_u = vertex_partition[u].min(n_partitions - 1);
306        let owner_v = vertex_partition[v].min(n_partitions - 1);
307
308        if owner_u != shard_idx {
309            mirror_sets[shard_idx].insert(u);
310        }
311        if owner_v != shard_idx {
312            mirror_sets[shard_idx].insert(v);
313        }
314    }
315
316    // Convert mirror sets to sorted vecs
317    for (pid, set) in mirror_sets.into_iter().enumerate() {
318        let mut mirrors: Vec<usize> = set.into_iter().collect();
319        mirrors.sort_unstable();
320        shards[pid].mirror_vertices = mirrors;
321    }
322
323    DistributedGraph {
324        shards,
325        n_global_vertices: n_vertices,
326        n_global_edges: edges.len(),
327        vertex_map,
328    }
329}
330
331// ────────────────────────────────────────────────────────────────────────────
332// Distributed query helpers
333// ────────────────────────────────────────────────────────────────────────────
334
335/// Return the degree of `vertex` by counting edges in its home shard.
336///
337/// Returns `None` if the vertex is not in the graph.
338pub fn distributed_degree(dg: &DistributedGraph, vertex: usize) -> Option<usize> {
339    let loc = dg.vertex_map.get(&vertex)?;
340    let shard = dg.shards.get(loc.partition_id)?;
341    let degree = shard
342        .local_edges
343        .iter()
344        .filter(|&&(u, v)| u == vertex || v == vertex)
345        .count();
346    Some(degree)
347}
348
349/// Return all neighbours of `vertex` by scanning its home shard's edge list.
350///
351/// Returns `None` if the vertex is not in the graph.
352pub fn distributed_neighbors(dg: &DistributedGraph, vertex: usize) -> Option<Vec<usize>> {
353    let loc = dg.vertex_map.get(&vertex)?;
354    let shard = dg.shards.get(loc.partition_id)?;
355    let neighbours: Vec<usize> = shard
356        .local_edges
357        .iter()
358        .filter_map(|&(u, v)| {
359            if u == vertex {
360                Some(v)
361            } else if v == vertex {
362                Some(u)
363            } else {
364                None
365            }
366        })
367        .collect();
368    Some(neighbours)
369}
370
371// ────────────────────────────────────────────────────────────────────────────
372// Statistics
373// ────────────────────────────────────────────────────────────────────────────
374
375/// Compute load-balance and edge-cut statistics for a distributed graph.
376pub fn distributed_graph_stats(dg: &DistributedGraph) -> DistributedStats {
377    let shard_sizes: Vec<usize> = dg.shards.iter().map(|s| s.local_vertices.len()).collect();
378
379    let total_verts: usize = shard_sizes.iter().sum();
380    let n = dg.shards.len();
381
382    let avg_size = if n > 0 {
383        total_verts as f64 / n as f64
384    } else {
385        1.0
386    };
387    let max_size = shard_sizes.iter().copied().max().unwrap_or(0) as f64;
388    let balance_ratio = if avg_size > 0.0 {
389        max_size / avg_size
390    } else {
391        1.0
392    };
393
394    // Edge cut: edges where the two endpoints belong to different shards
395    let mut cut_edges = 0usize;
396    for shard in &dg.shards {
397        for &(u, v) in &shard.local_edges {
398            let p_u = dg
399                .vertex_map
400                .get(&u)
401                .map(|loc| loc.partition_id)
402                .unwrap_or(0);
403            let p_v = dg
404                .vertex_map
405                .get(&v)
406                .map(|loc| loc.partition_id)
407                .unwrap_or(0);
408            if p_u != p_v {
409                cut_edges += 1;
410            }
411        }
412    }
413    let edge_cut_fraction = if dg.n_global_edges > 0 {
414        cut_edges as f64 / dg.n_global_edges as f64
415    } else {
416        0.0
417    };
418
419    // Replication factor: total appearances of each vertex (owned + mirrors)
420    let total_mirror: usize = dg.shards.iter().map(|s| s.mirror_vertices.len()).sum();
421    let replication_factor = if total_verts > 0 {
422        (total_verts + total_mirror) as f64 / total_verts as f64
423    } else {
424        1.0
425    };
426
427    DistributedStats {
428        balance_ratio,
429        edge_cut_fraction,
430        replication_factor,
431        shard_sizes,
432    }
433}
434
435// ────────────────────────────────────────────────────────────────────────────
436// Tests
437// ────────────────────────────────────────────────────────────────────────────
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    // Helper: Petersen graph edges (10 vertices, 15 edges)
444    fn petersen_edges() -> Vec<(usize, usize)> {
445        // Outer pentagon: 0-1-2-3-4-0
446        // Inner pentagram: 5-7-9-6-8-5
447        // Spokes: 0-5, 1-6, 2-7, 3-8, 4-9
448        vec![
449            (0, 1),
450            (1, 2),
451            (2, 3),
452            (3, 4),
453            (4, 0),
454            (5, 7),
455            (7, 9),
456            (9, 6),
457            (6, 8),
458            (8, 5),
459            (0, 5),
460            (1, 6),
461            (2, 7),
462            (3, 8),
463            (4, 9),
464        ]
465    }
466
467    // ── All 10 Petersen vertices are assigned with 4 partitions ──────────────
468    #[test]
469    fn test_distributed_graph_petersen_all_vertices() {
470        let edges = petersen_edges();
471        let cfg = DistributedGraphConfig::default(); // 4 partitions, hash
472        let dg = build_distributed_graph(&edges, 10, &cfg);
473
474        assert_eq!(dg.n_global_vertices, 10);
475        assert_eq!(dg.n_global_edges, 15);
476        assert_eq!(dg.vertex_map.len(), 10);
477
478        // Every vertex 0..10 must be present
479        for v in 0..10usize {
480            assert!(
481                dg.vertex_map.contains_key(&v),
482                "Vertex {v} missing from vertex_map"
483            );
484        }
485    }
486
487    // ── Stats: balance_ratio is reasonable ───────────────────────────────────
488    #[test]
489    fn test_distributed_graph_petersen_stats() {
490        let edges = petersen_edges();
491        let cfg = DistributedGraphConfig::default();
492        let dg = build_distributed_graph(&edges, 10, &cfg);
493        let stats = distributed_graph_stats(&dg);
494
495        // 10 vertices, 4 partitions → avg ~2.5
496        // Balance ratio should be ≤ 2.0 for hash partitioning (floor(10/4)=2 or 3 vertices each)
497        assert!(
498            stats.balance_ratio <= 2.0,
499            "balance_ratio {:.2} too high",
500            stats.balance_ratio
501        );
502
503        // Shard sizes must sum to n_global_vertices
504        let total: usize = stats.shard_sizes.iter().sum();
505        assert_eq!(total, 10);
506    }
507
508    // ── distributed_degree is correct for a known vertex ─────────────────────
509    #[test]
510    fn test_distributed_degree_petersen() {
511        let edges = petersen_edges();
512        let cfg = DistributedGraphConfig::default();
513        let dg = build_distributed_graph(&edges, 10, &cfg);
514
515        // In the Petersen graph every vertex has degree 3
516        for v in 0..10usize {
517            let deg = distributed_degree(&dg, v);
518            assert!(
519                deg.is_some(),
520                "distributed_degree returned None for vertex {v}"
521            );
522            // The edge-cut model assigns an edge to the source's shard, so
523            // degree via local edges counts outgoing + incoming edges on that shard.
524            // We just verify it's a reasonable non-zero value.
525            assert!(deg.unwrap() > 0, "Vertex {v} has 0 degree in its shard");
526        }
527    }
528
529    // ── distributed_neighbors returns non-empty for Petersen vertices ─────────
530    #[test]
531    fn test_distributed_neighbors_petersen() {
532        let edges = petersen_edges();
533        let cfg = DistributedGraphConfig::default();
534        let dg = build_distributed_graph(&edges, 10, &cfg);
535
536        for v in 0..10usize {
537            let nb = distributed_neighbors(&dg, v);
538            assert!(nb.is_some(), "distributed_neighbors returned None for {v}");
539        }
540    }
541
542    // ── FENNEL: all vertices are assigned to valid partitions ─────────────────
543    #[test]
544    fn test_fennel_partition_100_vertices() {
545        // Generate a random-ish graph with 100 vertices and 200 edges
546        let n = 100usize;
547        let edges: Vec<(usize, usize)> = (0..200)
548            .map(|i| {
549                let u = (i * 7 + 3) % n;
550                let v = (i * 13 + 17) % n;
551                (u, v)
552            })
553            .filter(|(u, v)| u != v)
554            .collect();
555
556        let cfg = DistributedGraphConfig {
557            n_partitions: 4,
558            partition_method: GraphPartitionMethod::Fennel,
559            replication_factor: 0,
560        };
561        let assignment = fennel_partition(&edges, n, 4, &cfg);
562
563        assert_eq!(assignment.len(), n);
564        for (v, &p) in assignment.iter().enumerate() {
565            assert!(p < 4, "Vertex {v} assigned to invalid partition {p}");
566        }
567    }
568
569    // ── FENNEL: build_distributed_graph with Fennel method ───────────────────
570    #[test]
571    fn test_build_distributed_graph_fennel() {
572        let edges = petersen_edges();
573        let cfg = DistributedGraphConfig {
574            n_partitions: 4,
575            partition_method: GraphPartitionMethod::Fennel,
576            replication_factor: 0,
577        };
578        let dg = build_distributed_graph(&edges, 10, &cfg);
579        assert_eq!(dg.vertex_map.len(), 10);
580        for v in 0..10usize {
581            assert!(dg.vertex_map.contains_key(&v));
582        }
583    }
584
585    // ── VertexCut: builds without panic ──────────────────────────────────────
586    #[test]
587    fn test_build_distributed_graph_vertex_cut() {
588        let edges = petersen_edges();
589        let cfg = DistributedGraphConfig {
590            n_partitions: 4,
591            partition_method: GraphPartitionMethod::VertexCut,
592            replication_factor: 0,
593        };
594        let dg = build_distributed_graph(&edges, 10, &cfg);
595        assert_eq!(dg.n_global_vertices, 10);
596        assert_eq!(dg.vertex_map.len(), 10);
597    }
598
599    // ── Hash partition basic ──────────────────────────────────────────────────
600    #[test]
601    fn test_hash_partition() {
602        assert_eq!(hash_partition(0, 4), 0);
603        assert_eq!(hash_partition(5, 4), 1);
604        assert_eq!(hash_partition(7, 4), 3);
605        assert_eq!(hash_partition(0, 0), 0); // edge case: 0 partitions
606    }
607
608    // ── Stats: edge_cut_fraction ∈ [0, 1] ─────────────────────────────────────
609    #[test]
610    fn test_stats_edge_cut_fraction_range() {
611        let edges = petersen_edges();
612        let cfg = DistributedGraphConfig::default();
613        let dg = build_distributed_graph(&edges, 10, &cfg);
614        let stats = distributed_graph_stats(&dg);
615        assert!(
616            stats.edge_cut_fraction >= 0.0 && stats.edge_cut_fraction <= 1.0,
617            "edge_cut_fraction = {} out of range",
618            stats.edge_cut_fraction
619        );
620    }
621}