use std::collections::HashMap;
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GraphPartitionMethod {
HashBased,
EdgeCut,
VertexCut,
Fennel,
}
#[derive(Debug, Clone)]
pub struct DistributedGraphConfig {
pub n_partitions: usize,
pub partition_method: GraphPartitionMethod,
pub replication_factor: usize,
}
impl Default for DistributedGraphConfig {
fn default() -> Self {
Self {
n_partitions: 4,
partition_method: GraphPartitionMethod::HashBased,
replication_factor: 0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct VertexLocation {
pub partition_id: usize,
pub local_id: usize,
}
#[derive(Debug, Clone, Default)]
pub struct GraphShard {
pub local_vertices: Vec<usize>,
pub local_edges: Vec<(usize, usize)>,
pub mirror_vertices: Vec<usize>,
}
#[derive(Debug, Clone)]
pub struct DistributedGraph {
pub shards: Vec<GraphShard>,
pub n_global_vertices: usize,
pub n_global_edges: usize,
pub vertex_map: HashMap<usize, VertexLocation>,
}
#[derive(Debug, Clone)]
pub struct DistributedStats {
pub balance_ratio: f64,
pub edge_cut_fraction: f64,
pub replication_factor: f64,
pub shard_sizes: Vec<usize>,
}
#[inline]
pub fn hash_partition(vertex: usize, n_partitions: usize) -> usize {
if n_partitions == 0 {
return 0;
}
vertex % n_partitions
}
pub fn fennel_partition(
edges: &[(usize, usize)],
n_vertices: usize,
n_partitions: usize,
config: &DistributedGraphConfig,
) -> Vec<usize> {
if n_partitions == 0 || n_vertices == 0 {
return vec![0; n_vertices];
}
let n_edges = edges.len() as f64;
let gamma = 1.5_f64;
let alpha = (n_edges / (n_partitions as f64).powf(gamma)).sqrt();
let mut assignment: Vec<Option<usize>> = vec![None; n_vertices];
let mut partition_sizes: Vec<f64> = vec![0.0; n_partitions];
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n_vertices];
let _ = config;
for &(u, v) in edges {
if u < n_vertices && v < n_vertices {
adj[u].push(v);
adj[v].push(u);
}
for &vertex in &[u, v] {
if vertex >= n_vertices || assignment[vertex].is_some() {
continue;
}
let mut neighbour_counts: Vec<f64> = vec![0.0; n_partitions];
for &nb in &adj[vertex] {
if nb < n_vertices {
if let Some(p) = assignment[nb] {
neighbour_counts[p] += 1.0;
}
}
}
let best = (0..n_partitions)
.map(|p| {
let sz = partition_sizes[p];
let score = neighbour_counts[p] - alpha * sz.powf(gamma);
(p, score)
})
.max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|(p, _)| p)
.unwrap_or(0);
assignment[vertex] = Some(best);
partition_sizes[best] += 1.0;
}
}
assignment
.iter()
.enumerate()
.map(|(v, a)| a.unwrap_or_else(|| hash_partition(v, n_partitions)))
.collect()
}
pub fn build_distributed_graph(
edges: &[(usize, usize)],
n_vertices: usize,
config: &DistributedGraphConfig,
) -> DistributedGraph {
let n_partitions = config.n_partitions.max(1);
let vertex_partition: Vec<usize> = match config.partition_method {
GraphPartitionMethod::HashBased | GraphPartitionMethod::EdgeCut => (0..n_vertices)
.map(|v| hash_partition(v, n_partitions))
.collect(),
GraphPartitionMethod::VertexCut => {
(0..n_vertices)
.map(|v| hash_partition(v, n_partitions))
.collect()
}
GraphPartitionMethod::Fennel => fennel_partition(edges, n_vertices, n_partitions, config),
};
let mut degree: Vec<usize> = vec![0usize; n_vertices];
for &(u, v) in edges {
if u < n_vertices {
degree[u] += 1;
}
if v < n_vertices {
degree[v] += 1;
}
}
let mut shards: Vec<GraphShard> = (0..n_partitions).map(|_| GraphShard::default()).collect();
let mut vertex_map: HashMap<usize, VertexLocation> = HashMap::with_capacity(n_vertices);
for v in 0..n_vertices {
let pid = vertex_partition[v];
let pid = pid.min(n_partitions - 1);
let local_id = shards[pid].local_vertices.len();
shards[pid].local_vertices.push(v);
vertex_map.insert(
v,
VertexLocation {
partition_id: pid,
local_id,
},
);
}
let mut mirror_sets: Vec<std::collections::HashSet<usize>> = (0..n_partitions)
.map(|_| std::collections::HashSet::new())
.collect();
for &(u, v) in edges {
if u >= n_vertices || v >= n_vertices {
continue;
}
let shard_idx = match config.partition_method {
GraphPartitionMethod::VertexCut => {
if degree[u] <= degree[v] {
vertex_partition[u].min(n_partitions - 1)
} else {
vertex_partition[v].min(n_partitions - 1)
}
}
_ => {
vertex_partition[u].min(n_partitions - 1)
}
};
shards[shard_idx].local_edges.push((u, v));
let owner_u = vertex_partition[u].min(n_partitions - 1);
let owner_v = vertex_partition[v].min(n_partitions - 1);
if owner_u != shard_idx {
mirror_sets[shard_idx].insert(u);
}
if owner_v != shard_idx {
mirror_sets[shard_idx].insert(v);
}
}
for (pid, set) in mirror_sets.into_iter().enumerate() {
let mut mirrors: Vec<usize> = set.into_iter().collect();
mirrors.sort_unstable();
shards[pid].mirror_vertices = mirrors;
}
DistributedGraph {
shards,
n_global_vertices: n_vertices,
n_global_edges: edges.len(),
vertex_map,
}
}
pub fn distributed_degree(dg: &DistributedGraph, vertex: usize) -> Option<usize> {
let loc = dg.vertex_map.get(&vertex)?;
let shard = dg.shards.get(loc.partition_id)?;
let degree = shard
.local_edges
.iter()
.filter(|&&(u, v)| u == vertex || v == vertex)
.count();
Some(degree)
}
pub fn distributed_neighbors(dg: &DistributedGraph, vertex: usize) -> Option<Vec<usize>> {
let loc = dg.vertex_map.get(&vertex)?;
let shard = dg.shards.get(loc.partition_id)?;
let neighbours: Vec<usize> = shard
.local_edges
.iter()
.filter_map(|&(u, v)| {
if u == vertex {
Some(v)
} else if v == vertex {
Some(u)
} else {
None
}
})
.collect();
Some(neighbours)
}
pub fn distributed_graph_stats(dg: &DistributedGraph) -> DistributedStats {
let shard_sizes: Vec<usize> = dg.shards.iter().map(|s| s.local_vertices.len()).collect();
let total_verts: usize = shard_sizes.iter().sum();
let n = dg.shards.len();
let avg_size = if n > 0 {
total_verts as f64 / n as f64
} else {
1.0
};
let max_size = shard_sizes.iter().copied().max().unwrap_or(0) as f64;
let balance_ratio = if avg_size > 0.0 {
max_size / avg_size
} else {
1.0
};
let mut cut_edges = 0usize;
for shard in &dg.shards {
for &(u, v) in &shard.local_edges {
let p_u = dg
.vertex_map
.get(&u)
.map(|loc| loc.partition_id)
.unwrap_or(0);
let p_v = dg
.vertex_map
.get(&v)
.map(|loc| loc.partition_id)
.unwrap_or(0);
if p_u != p_v {
cut_edges += 1;
}
}
}
let edge_cut_fraction = if dg.n_global_edges > 0 {
cut_edges as f64 / dg.n_global_edges as f64
} else {
0.0
};
let total_mirror: usize = dg.shards.iter().map(|s| s.mirror_vertices.len()).sum();
let replication_factor = if total_verts > 0 {
(total_verts + total_mirror) as f64 / total_verts as f64
} else {
1.0
};
DistributedStats {
balance_ratio,
edge_cut_fraction,
replication_factor,
shard_sizes,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn petersen_edges() -> Vec<(usize, usize)> {
vec![
(0, 1),
(1, 2),
(2, 3),
(3, 4),
(4, 0),
(5, 7),
(7, 9),
(9, 6),
(6, 8),
(8, 5),
(0, 5),
(1, 6),
(2, 7),
(3, 8),
(4, 9),
]
}
#[test]
fn test_distributed_graph_petersen_all_vertices() {
let edges = petersen_edges();
let cfg = DistributedGraphConfig::default(); let dg = build_distributed_graph(&edges, 10, &cfg);
assert_eq!(dg.n_global_vertices, 10);
assert_eq!(dg.n_global_edges, 15);
assert_eq!(dg.vertex_map.len(), 10);
for v in 0..10usize {
assert!(
dg.vertex_map.contains_key(&v),
"Vertex {v} missing from vertex_map"
);
}
}
#[test]
fn test_distributed_graph_petersen_stats() {
let edges = petersen_edges();
let cfg = DistributedGraphConfig::default();
let dg = build_distributed_graph(&edges, 10, &cfg);
let stats = distributed_graph_stats(&dg);
assert!(
stats.balance_ratio <= 2.0,
"balance_ratio {:.2} too high",
stats.balance_ratio
);
let total: usize = stats.shard_sizes.iter().sum();
assert_eq!(total, 10);
}
#[test]
fn test_distributed_degree_petersen() {
let edges = petersen_edges();
let cfg = DistributedGraphConfig::default();
let dg = build_distributed_graph(&edges, 10, &cfg);
for v in 0..10usize {
let deg = distributed_degree(&dg, v);
assert!(
deg.is_some(),
"distributed_degree returned None for vertex {v}"
);
assert!(deg.unwrap() > 0, "Vertex {v} has 0 degree in its shard");
}
}
#[test]
fn test_distributed_neighbors_petersen() {
let edges = petersen_edges();
let cfg = DistributedGraphConfig::default();
let dg = build_distributed_graph(&edges, 10, &cfg);
for v in 0..10usize {
let nb = distributed_neighbors(&dg, v);
assert!(nb.is_some(), "distributed_neighbors returned None for {v}");
}
}
#[test]
fn test_fennel_partition_100_vertices() {
let n = 100usize;
let edges: Vec<(usize, usize)> = (0..200)
.map(|i| {
let u = (i * 7 + 3) % n;
let v = (i * 13 + 17) % n;
(u, v)
})
.filter(|(u, v)| u != v)
.collect();
let cfg = DistributedGraphConfig {
n_partitions: 4,
partition_method: GraphPartitionMethod::Fennel,
replication_factor: 0,
};
let assignment = fennel_partition(&edges, n, 4, &cfg);
assert_eq!(assignment.len(), n);
for (v, &p) in assignment.iter().enumerate() {
assert!(p < 4, "Vertex {v} assigned to invalid partition {p}");
}
}
#[test]
fn test_build_distributed_graph_fennel() {
let edges = petersen_edges();
let cfg = DistributedGraphConfig {
n_partitions: 4,
partition_method: GraphPartitionMethod::Fennel,
replication_factor: 0,
};
let dg = build_distributed_graph(&edges, 10, &cfg);
assert_eq!(dg.vertex_map.len(), 10);
for v in 0..10usize {
assert!(dg.vertex_map.contains_key(&v));
}
}
#[test]
fn test_build_distributed_graph_vertex_cut() {
let edges = petersen_edges();
let cfg = DistributedGraphConfig {
n_partitions: 4,
partition_method: GraphPartitionMethod::VertexCut,
replication_factor: 0,
};
let dg = build_distributed_graph(&edges, 10, &cfg);
assert_eq!(dg.n_global_vertices, 10);
assert_eq!(dg.vertex_map.len(), 10);
}
#[test]
fn test_hash_partition() {
assert_eq!(hash_partition(0, 4), 0);
assert_eq!(hash_partition(5, 4), 1);
assert_eq!(hash_partition(7, 4), 3);
assert_eq!(hash_partition(0, 0), 0); }
#[test]
fn test_stats_edge_cut_fraction_range() {
let edges = petersen_edges();
let cfg = DistributedGraphConfig::default();
let dg = build_distributed_graph(&edges, 10, &cfg);
let stats = distributed_graph_stats(&dg);
assert!(
stats.edge_cut_fraction >= 0.0 && stats.edge_cut_fraction <= 1.0,
"edge_cut_fraction = {} out of range",
stats.edge_cut_fraction
);
}
}