1use std::collections::HashMap;
33
34#[non_exhaustive]
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub enum GraphPartitionMethod {
42 HashBased,
44 EdgeCut,
46 VertexCut,
48 Fennel,
51}
52
53#[derive(Debug, Clone)]
55pub struct DistributedGraphConfig {
56 pub n_partitions: usize,
58 pub partition_method: GraphPartitionMethod,
60 pub replication_factor: usize,
62}
63
64impl Default for DistributedGraphConfig {
65 fn default() -> Self {
66 Self {
67 n_partitions: 4,
68 partition_method: GraphPartitionMethod::HashBased,
69 replication_factor: 0,
70 }
71 }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub struct VertexLocation {
77 pub partition_id: usize,
79 pub local_id: usize,
81}
82
83#[derive(Debug, Clone, Default)]
85pub struct GraphShard {
86 pub local_vertices: Vec<usize>,
88 pub local_edges: Vec<(usize, usize)>,
90 pub mirror_vertices: Vec<usize>,
92}
93
94#[derive(Debug, Clone)]
96pub struct DistributedGraph {
97 pub shards: Vec<GraphShard>,
99 pub n_global_vertices: usize,
101 pub n_global_edges: usize,
103 pub vertex_map: HashMap<usize, VertexLocation>,
105}
106
107#[derive(Debug, Clone)]
109pub struct DistributedStats {
110 pub balance_ratio: f64,
112 pub edge_cut_fraction: f64,
114 pub replication_factor: f64,
116 pub shard_sizes: Vec<usize>,
118}
119
120#[inline]
126pub fn hash_partition(vertex: usize, n_partitions: usize) -> usize {
127 if n_partitions == 0 {
128 return 0;
129 }
130 vertex % n_partitions
131}
132
133pub fn fennel_partition(
143 edges: &[(usize, usize)],
144 n_vertices: usize,
145 n_partitions: usize,
146 config: &DistributedGraphConfig,
147) -> Vec<usize> {
148 if n_partitions == 0 || n_vertices == 0 {
149 return vec![0; n_vertices];
150 }
151
152 let n_edges = edges.len() as f64;
153 let gamma = 1.5_f64;
154 let alpha = (n_edges / (n_partitions as f64).powf(gamma)).sqrt();
156
157 let mut assignment: Vec<Option<usize>> = vec![None; n_vertices];
159 let mut partition_sizes: Vec<f64> = vec![0.0; n_partitions];
161 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n_vertices];
163
164 let _ = config; for &(u, v) in edges {
167 if u < n_vertices && v < n_vertices {
169 adj[u].push(v);
170 adj[v].push(u);
171 }
172
173 for &vertex in &[u, v] {
175 if vertex >= n_vertices || assignment[vertex].is_some() {
176 continue;
177 }
178 let mut neighbour_counts: Vec<f64> = vec![0.0; n_partitions];
180 for &nb in &adj[vertex] {
181 if nb < n_vertices {
182 if let Some(p) = assignment[nb] {
183 neighbour_counts[p] += 1.0;
184 }
185 }
186 }
187 let best = (0..n_partitions)
189 .map(|p| {
190 let sz = partition_sizes[p];
191 let score = neighbour_counts[p] - alpha * sz.powf(gamma);
192 (p, score)
193 })
194 .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
195 .map(|(p, _)| p)
196 .unwrap_or(0);
197
198 assignment[vertex] = Some(best);
199 partition_sizes[best] += 1.0;
200 }
201 }
202
203 assignment
205 .iter()
206 .enumerate()
207 .map(|(v, a)| a.unwrap_or_else(|| hash_partition(v, n_partitions)))
208 .collect()
209}
210
211pub fn build_distributed_graph(
226 edges: &[(usize, usize)],
227 n_vertices: usize,
228 config: &DistributedGraphConfig,
229) -> DistributedGraph {
230 let n_partitions = config.n_partitions.max(1);
231
232 let vertex_partition: Vec<usize> = match config.partition_method {
234 GraphPartitionMethod::HashBased | GraphPartitionMethod::EdgeCut => (0..n_vertices)
235 .map(|v| hash_partition(v, n_partitions))
236 .collect(),
237 GraphPartitionMethod::VertexCut => {
238 (0..n_vertices)
241 .map(|v| hash_partition(v, n_partitions))
242 .collect()
243 }
244 GraphPartitionMethod::Fennel => fennel_partition(edges, n_vertices, n_partitions, config),
245 };
246
247 let mut degree: Vec<usize> = vec![0usize; n_vertices];
249 for &(u, v) in edges {
250 if u < n_vertices {
251 degree[u] += 1;
252 }
253 if v < n_vertices {
254 degree[v] += 1;
255 }
256 }
257
258 let mut shards: Vec<GraphShard> = (0..n_partitions).map(|_| GraphShard::default()).collect();
260
261 let mut vertex_map: HashMap<usize, VertexLocation> = HashMap::with_capacity(n_vertices);
263 for v in 0..n_vertices {
264 let pid = vertex_partition[v];
265 let pid = pid.min(n_partitions - 1);
266 let local_id = shards[pid].local_vertices.len();
267 shards[pid].local_vertices.push(v);
268 vertex_map.insert(
269 v,
270 VertexLocation {
271 partition_id: pid,
272 local_id,
273 },
274 );
275 }
276
277 let mut mirror_sets: Vec<std::collections::HashSet<usize>> = (0..n_partitions)
280 .map(|_| std::collections::HashSet::new())
281 .collect();
282
283 for &(u, v) in edges {
284 if u >= n_vertices || v >= n_vertices {
285 continue;
286 }
287 let shard_idx = match config.partition_method {
288 GraphPartitionMethod::VertexCut => {
289 if degree[u] <= degree[v] {
291 vertex_partition[u].min(n_partitions - 1)
292 } else {
293 vertex_partition[v].min(n_partitions - 1)
294 }
295 }
296 _ => {
297 vertex_partition[u].min(n_partitions - 1)
299 }
300 };
301
302 shards[shard_idx].local_edges.push((u, v));
303
304 let owner_u = vertex_partition[u].min(n_partitions - 1);
306 let owner_v = vertex_partition[v].min(n_partitions - 1);
307
308 if owner_u != shard_idx {
309 mirror_sets[shard_idx].insert(u);
310 }
311 if owner_v != shard_idx {
312 mirror_sets[shard_idx].insert(v);
313 }
314 }
315
316 for (pid, set) in mirror_sets.into_iter().enumerate() {
318 let mut mirrors: Vec<usize> = set.into_iter().collect();
319 mirrors.sort_unstable();
320 shards[pid].mirror_vertices = mirrors;
321 }
322
323 DistributedGraph {
324 shards,
325 n_global_vertices: n_vertices,
326 n_global_edges: edges.len(),
327 vertex_map,
328 }
329}
330
331pub fn distributed_degree(dg: &DistributedGraph, vertex: usize) -> Option<usize> {
339 let loc = dg.vertex_map.get(&vertex)?;
340 let shard = dg.shards.get(loc.partition_id)?;
341 let degree = shard
342 .local_edges
343 .iter()
344 .filter(|&&(u, v)| u == vertex || v == vertex)
345 .count();
346 Some(degree)
347}
348
349pub fn distributed_neighbors(dg: &DistributedGraph, vertex: usize) -> Option<Vec<usize>> {
353 let loc = dg.vertex_map.get(&vertex)?;
354 let shard = dg.shards.get(loc.partition_id)?;
355 let neighbours: Vec<usize> = shard
356 .local_edges
357 .iter()
358 .filter_map(|&(u, v)| {
359 if u == vertex {
360 Some(v)
361 } else if v == vertex {
362 Some(u)
363 } else {
364 None
365 }
366 })
367 .collect();
368 Some(neighbours)
369}
370
371pub fn distributed_graph_stats(dg: &DistributedGraph) -> DistributedStats {
377 let shard_sizes: Vec<usize> = dg.shards.iter().map(|s| s.local_vertices.len()).collect();
378
379 let total_verts: usize = shard_sizes.iter().sum();
380 let n = dg.shards.len();
381
382 let avg_size = if n > 0 {
383 total_verts as f64 / n as f64
384 } else {
385 1.0
386 };
387 let max_size = shard_sizes.iter().copied().max().unwrap_or(0) as f64;
388 let balance_ratio = if avg_size > 0.0 {
389 max_size / avg_size
390 } else {
391 1.0
392 };
393
394 let mut cut_edges = 0usize;
396 for shard in &dg.shards {
397 for &(u, v) in &shard.local_edges {
398 let p_u = dg
399 .vertex_map
400 .get(&u)
401 .map(|loc| loc.partition_id)
402 .unwrap_or(0);
403 let p_v = dg
404 .vertex_map
405 .get(&v)
406 .map(|loc| loc.partition_id)
407 .unwrap_or(0);
408 if p_u != p_v {
409 cut_edges += 1;
410 }
411 }
412 }
413 let edge_cut_fraction = if dg.n_global_edges > 0 {
414 cut_edges as f64 / dg.n_global_edges as f64
415 } else {
416 0.0
417 };
418
419 let total_mirror: usize = dg.shards.iter().map(|s| s.mirror_vertices.len()).sum();
421 let replication_factor = if total_verts > 0 {
422 (total_verts + total_mirror) as f64 / total_verts as f64
423 } else {
424 1.0
425 };
426
427 DistributedStats {
428 balance_ratio,
429 edge_cut_fraction,
430 replication_factor,
431 shard_sizes,
432 }
433}
434
435#[cfg(test)]
440mod tests {
441 use super::*;
442
443 fn petersen_edges() -> Vec<(usize, usize)> {
445 vec![
449 (0, 1),
450 (1, 2),
451 (2, 3),
452 (3, 4),
453 (4, 0),
454 (5, 7),
455 (7, 9),
456 (9, 6),
457 (6, 8),
458 (8, 5),
459 (0, 5),
460 (1, 6),
461 (2, 7),
462 (3, 8),
463 (4, 9),
464 ]
465 }
466
467 #[test]
469 fn test_distributed_graph_petersen_all_vertices() {
470 let edges = petersen_edges();
471 let cfg = DistributedGraphConfig::default(); let dg = build_distributed_graph(&edges, 10, &cfg);
473
474 assert_eq!(dg.n_global_vertices, 10);
475 assert_eq!(dg.n_global_edges, 15);
476 assert_eq!(dg.vertex_map.len(), 10);
477
478 for v in 0..10usize {
480 assert!(
481 dg.vertex_map.contains_key(&v),
482 "Vertex {v} missing from vertex_map"
483 );
484 }
485 }
486
487 #[test]
489 fn test_distributed_graph_petersen_stats() {
490 let edges = petersen_edges();
491 let cfg = DistributedGraphConfig::default();
492 let dg = build_distributed_graph(&edges, 10, &cfg);
493 let stats = distributed_graph_stats(&dg);
494
495 assert!(
498 stats.balance_ratio <= 2.0,
499 "balance_ratio {:.2} too high",
500 stats.balance_ratio
501 );
502
503 let total: usize = stats.shard_sizes.iter().sum();
505 assert_eq!(total, 10);
506 }
507
508 #[test]
510 fn test_distributed_degree_petersen() {
511 let edges = petersen_edges();
512 let cfg = DistributedGraphConfig::default();
513 let dg = build_distributed_graph(&edges, 10, &cfg);
514
515 for v in 0..10usize {
517 let deg = distributed_degree(&dg, v);
518 assert!(
519 deg.is_some(),
520 "distributed_degree returned None for vertex {v}"
521 );
522 assert!(deg.unwrap() > 0, "Vertex {v} has 0 degree in its shard");
526 }
527 }
528
529 #[test]
531 fn test_distributed_neighbors_petersen() {
532 let edges = petersen_edges();
533 let cfg = DistributedGraphConfig::default();
534 let dg = build_distributed_graph(&edges, 10, &cfg);
535
536 for v in 0..10usize {
537 let nb = distributed_neighbors(&dg, v);
538 assert!(nb.is_some(), "distributed_neighbors returned None for {v}");
539 }
540 }
541
542 #[test]
544 fn test_fennel_partition_100_vertices() {
545 let n = 100usize;
547 let edges: Vec<(usize, usize)> = (0..200)
548 .map(|i| {
549 let u = (i * 7 + 3) % n;
550 let v = (i * 13 + 17) % n;
551 (u, v)
552 })
553 .filter(|(u, v)| u != v)
554 .collect();
555
556 let cfg = DistributedGraphConfig {
557 n_partitions: 4,
558 partition_method: GraphPartitionMethod::Fennel,
559 replication_factor: 0,
560 };
561 let assignment = fennel_partition(&edges, n, 4, &cfg);
562
563 assert_eq!(assignment.len(), n);
564 for (v, &p) in assignment.iter().enumerate() {
565 assert!(p < 4, "Vertex {v} assigned to invalid partition {p}");
566 }
567 }
568
569 #[test]
571 fn test_build_distributed_graph_fennel() {
572 let edges = petersen_edges();
573 let cfg = DistributedGraphConfig {
574 n_partitions: 4,
575 partition_method: GraphPartitionMethod::Fennel,
576 replication_factor: 0,
577 };
578 let dg = build_distributed_graph(&edges, 10, &cfg);
579 assert_eq!(dg.vertex_map.len(), 10);
580 for v in 0..10usize {
581 assert!(dg.vertex_map.contains_key(&v));
582 }
583 }
584
585 #[test]
587 fn test_build_distributed_graph_vertex_cut() {
588 let edges = petersen_edges();
589 let cfg = DistributedGraphConfig {
590 n_partitions: 4,
591 partition_method: GraphPartitionMethod::VertexCut,
592 replication_factor: 0,
593 };
594 let dg = build_distributed_graph(&edges, 10, &cfg);
595 assert_eq!(dg.n_global_vertices, 10);
596 assert_eq!(dg.vertex_map.len(), 10);
597 }
598
599 #[test]
601 fn test_hash_partition() {
602 assert_eq!(hash_partition(0, 4), 0);
603 assert_eq!(hash_partition(5, 4), 1);
604 assert_eq!(hash_partition(7, 4), 3);
605 assert_eq!(hash_partition(0, 0), 0); }
607
608 #[test]
610 fn test_stats_edge_cut_fraction_range() {
611 let edges = petersen_edges();
612 let cfg = DistributedGraphConfig::default();
613 let dg = build_distributed_graph(&edges, 10, &cfg);
614 let stats = distributed_graph_stats(&dg);
615 assert!(
616 stats.edge_cut_fraction >= 0.0 && stats.edge_cut_fraction <= 1.0,
617 "edge_cut_fraction = {} out of range",
618 stats.edge_cut_fraction
619 );
620 }
621}