1use scirs2_core::ndarray::Array2;
13
14use crate::error::{GraphError, Result};
15
16use super::types::{PartitionConfig, PartitionResult};
17
18pub fn streaming_partition(
40 edges: &[(usize, usize)],
41 n_nodes: usize,
42 config: &PartitionConfig,
43) -> Result<PartitionResult> {
44 let k = config.n_partitions;
45
46 if k < 2 {
47 return Err(GraphError::InvalidParameter {
48 param: "n_partitions".to_string(),
49 value: format!("{}", k),
50 expected: "at least 2".to_string(),
51 context: "streaming_partition".to_string(),
52 });
53 }
54
55 if n_nodes < 2 {
56 return Err(GraphError::InvalidParameter {
57 param: "n_nodes".to_string(),
58 value: format!("{}", n_nodes),
59 expected: "at least 2".to_string(),
60 context: "streaming_partition".to_string(),
61 });
62 }
63
64 if k > n_nodes {
65 return Err(GraphError::InvalidParameter {
66 param: "n_partitions".to_string(),
67 value: format!("{}", k),
68 expected: format!("at most {} (number of nodes)", n_nodes),
69 context: "streaming_partition".to_string(),
70 });
71 }
72
73 let mut adj_list: Vec<Vec<usize>> = vec![Vec::new(); n_nodes];
75 for &(u, v) in edges {
76 if u < n_nodes && v < n_nodes {
77 adj_list[u].push(v);
78 }
79 }
80
81 let capacity =
83 ((n_nodes as f64) * (1.0 + config.balance_tolerance) / (k as f64)).ceil() as usize;
84
85 let mut assignments = vec![usize::MAX; n_nodes];
86 let mut partition_sizes = vec![0usize; k];
87
88 for node in 0..n_nodes {
90 let mut best_partition = 0usize;
91 let mut best_score = f64::NEG_INFINITY;
92
93 for p in 0..k {
94 if partition_sizes[p] >= capacity {
95 continue;
96 }
97
98 let neighbors_in_p = adj_list[node]
100 .iter()
101 .filter(|&&nbr| nbr < node && assignments[nbr] == p)
102 .count();
103
104 let load_factor = partition_sizes[p] as f64 / capacity as f64;
106 let score = (neighbors_in_p as f64) * (1.0 - load_factor);
107
108 if score > best_score
110 || (score == best_score && partition_sizes[p] < partition_sizes[best_partition])
111 {
112 best_score = score;
113 best_partition = p;
114 }
115 }
116
117 assignments[node] = best_partition;
118 partition_sizes[best_partition] += 1;
119 }
120
121 let mut edge_cut = 0usize;
123 let mut seen_edges = std::collections::HashSet::new();
124 for &(u, v) in edges {
125 if u < n_nodes && v < n_nodes && u != v {
126 let key = if u < v { (u, v) } else { (v, u) };
127 if seen_edges.insert(key) && assignments[u] != assignments[v] {
128 edge_cut += 1;
129 }
130 }
131 }
132
133 let ideal = n_nodes as f64 / k as f64;
135 let imbalance = if ideal > 0.0 {
136 partition_sizes
137 .iter()
138 .map(|&s| ((s as f64) - ideal).abs() / ideal)
139 .fold(0.0f64, f64::max)
140 } else {
141 0.0
142 };
143
144 Ok(PartitionResult {
145 assignments,
146 edge_cut,
147 partition_sizes,
148 imbalance,
149 })
150}
151
152pub fn hash_partition(n_nodes: usize, n_partitions: usize) -> PartitionResult {
164 let mut assignments = vec![0usize; n_nodes];
165 let mut partition_sizes = vec![0usize; n_partitions];
166
167 for i in 0..n_nodes {
168 let p = i % n_partitions;
169 assignments[i] = p;
170 partition_sizes[p] += 1;
171 }
172
173 let ideal = n_nodes as f64 / n_partitions as f64;
174 let imbalance = if ideal > 0.0 {
175 partition_sizes
176 .iter()
177 .map(|&s| ((s as f64) - ideal).abs() / ideal)
178 .fold(0.0f64, f64::max)
179 } else {
180 0.0
181 };
182
183 PartitionResult {
184 assignments,
185 edge_cut: 0, partition_sizes,
187 imbalance,
188 }
189}
190
191pub fn evaluate_partition(
204 adj: &Array2<f64>,
205 assignments: &[usize],
206 n_partitions: usize,
207) -> (usize, f64) {
208 let n = adj.nrows().min(assignments.len());
209
210 let mut partition_sizes = vec![0usize; n_partitions];
211 for &a in &assignments[..n] {
212 if a < n_partitions {
213 partition_sizes[a] += 1;
214 }
215 }
216
217 let mut edge_cut = 0usize;
218 for i in 0..n {
219 for j in (i + 1)..n {
220 if adj[[i, j]].abs() > 1e-15 && assignments[i] != assignments[j] {
221 edge_cut += 1;
222 }
223 }
224 }
225
226 let ideal = n as f64 / n_partitions as f64;
227 let imbalance = if ideal > 0.0 {
228 partition_sizes
229 .iter()
230 .map(|&s| ((s as f64) - ideal).abs() / ideal)
231 .fold(0.0f64, f64::max)
232 } else {
233 0.0
234 };
235
236 (edge_cut, imbalance)
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use scirs2_core::ndarray::Array2;
243
244 fn two_cliques_edges(n: usize) -> (Vec<(usize, usize)>, usize) {
246 let size = 2 * n;
247 let mut edges = Vec::new();
248 for i in 0..n {
250 for j in (i + 1)..n {
251 edges.push((i, j));
252 edges.push((j, i));
253 }
254 }
255 for i in n..size {
257 for j in (i + 1)..size {
258 edges.push((i, j));
259 edges.push((j, i));
260 }
261 }
262 edges.push((n - 1, n));
264 edges.push((n, n - 1));
265 (edges, size)
266 }
267
268 #[test]
269 fn test_ldg_better_than_hash_on_structured() {
270 let (edges, n_nodes) = two_cliques_edges(6);
271 let config = PartitionConfig {
272 n_partitions: 2,
273 balance_tolerance: 0.1,
274 ..PartitionConfig::default()
275 };
276
277 let ldg_result = streaming_partition(&edges, n_nodes, &config).expect("LDG should succeed");
278
279 let mut adj = Array2::<f64>::zeros((n_nodes, n_nodes));
281 for &(u, v) in &edges {
282 adj[[u, v]] = 1.0;
283 }
284
285 let hash_result = hash_partition(n_nodes, 2);
286 let (hash_cut, _) = evaluate_partition(&adj, &hash_result.assignments, 2);
287
288 assert!(
290 ldg_result.edge_cut <= hash_cut + 2,
291 "LDG edge cut ({}) should be competitive with hash ({})",
292 ldg_result.edge_cut,
293 hash_cut
294 );
295 }
296
297 #[test]
298 fn test_hash_uniform_sizes() {
299 let n_nodes = 100;
300 let k = 4;
301 let result = hash_partition(n_nodes, k);
302 assert_eq!(result.partition_sizes.len(), k);
303 for &s in &result.partition_sizes {
305 assert_eq!(s, 25);
306 }
307 assert!(result.imbalance < 1e-10);
308 }
309
310 #[test]
311 fn test_hash_near_uniform_sizes() {
312 let n_nodes = 10;
313 let k = 3;
314 let result = hash_partition(n_nodes, k);
315 assert_eq!(result.partition_sizes.len(), k);
316 let total: usize = result.partition_sizes.iter().sum();
318 assert_eq!(total, n_nodes);
319 for &s in &result.partition_sizes {
320 assert!((3..=4).contains(&s));
321 }
322 }
323
324 #[test]
325 fn test_evaluate_partition() {
326 let n = 4;
327 let mut adj = Array2::<f64>::zeros((n, n));
328 adj[[0, 1]] = 1.0;
329 adj[[1, 0]] = 1.0;
330 adj[[2, 3]] = 1.0;
331 adj[[3, 2]] = 1.0;
332 adj[[1, 2]] = 1.0;
333 adj[[2, 1]] = 1.0;
334
335 let assignments = vec![0, 0, 1, 1];
336 let (cut, imbalance) = evaluate_partition(&adj, &assignments, 2);
337 assert_eq!(cut, 1); assert!(imbalance < 1e-10);
339 }
340
341 #[test]
342 fn test_single_node_trivial() {
343 let edges = vec![(0, 1), (1, 0)];
345 let config = PartitionConfig {
346 n_partitions: 2,
347 balance_tolerance: 0.5,
348 ..PartitionConfig::default()
349 };
350 let result = streaming_partition(&edges, 2, &config).expect("should succeed");
351 assert_eq!(result.assignments.len(), 2);
352 assert!(result.assignments[0] < 2);
354 assert!(result.assignments[1] < 2);
355 let total: usize = result.partition_sizes.iter().sum();
357 assert_eq!(total, 2);
358 }
359
360 #[test]
361 fn test_streaming_invalid_params() {
362 let config = PartitionConfig {
363 n_partitions: 1,
364 ..PartitionConfig::default()
365 };
366 assert!(streaming_partition(&[], 10, &config).is_err());
367
368 let config2 = PartitionConfig {
369 n_partitions: 5,
370 ..PartitionConfig::default()
371 };
372 assert!(streaming_partition(&[], 3, &config2).is_err());
373 }
374
375 #[test]
376 fn test_edge_cut_computable() {
377 let (edges, n_nodes) = two_cliques_edges(4);
378 let mut adj = Array2::<f64>::zeros((n_nodes, n_nodes));
379 for &(u, v) in &edges {
380 adj[[u, v]] = 1.0;
381 }
382
383 let config = PartitionConfig {
384 n_partitions: 2,
385 balance_tolerance: 0.2,
386 ..PartitionConfig::default()
387 };
388 let result = streaming_partition(&edges, n_nodes, &config).expect("should succeed");
389
390 let (eval_cut, _) = evaluate_partition(&adj, &result.assignments, 2);
392 assert_eq!(result.edge_cut, eval_cut);
393 }
394}
395
396#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402#[non_exhaustive]
403pub enum StreamingPartitionAlgorithm {
404 Fennel,
407 Hashing,
409 LinearDeterministic,
411}
412
413#[derive(Debug, Clone)]
415pub struct StreamingPartitionConfig {
416 pub n_parts: usize,
418 pub algorithm: StreamingPartitionAlgorithm,
420 pub gamma: f64,
422 pub alpha: f64,
425}
426
427impl Default for StreamingPartitionConfig {
428 fn default() -> Self {
429 Self {
430 n_parts: 2,
431 algorithm: StreamingPartitionAlgorithm::Fennel,
432 gamma: 1.5,
433 alpha: 1.5,
434 }
435 }
436}
437
438pub struct StreamingPartitioner {
456 config: StreamingPartitionConfig,
457 partition: Vec<Option<usize>>,
459 part_sizes: Vec<usize>,
461 n_assigned: usize,
463}
464
465impl StreamingPartitioner {
466 pub fn new(n_nodes: usize, config: StreamingPartitionConfig) -> Self {
468 let k = config.n_parts;
469 Self {
470 config,
471 partition: vec![None; n_nodes],
472 part_sizes: vec![0usize; k],
473 n_assigned: 0,
474 }
475 }
476
477 pub fn assign_vertex(&mut self, v: usize, neighbors: &[(usize, f64)]) -> usize {
482 if v >= self.partition.len() {
483 self.partition.resize(v + 1, None);
485 }
486
487 if let Some(p) = self.partition[v] {
489 return p;
490 }
491
492 let k = self.config.n_parts;
493 let n_total = self.partition.len().max(1);
494 let cap = ((n_total as f64 * 1.05) / k as f64).ceil() as usize;
496
497 let best_p = match self.config.algorithm {
498 StreamingPartitionAlgorithm::Hashing => v % k,
499 StreamingPartitionAlgorithm::LinearDeterministic => {
500 let mut best = 0usize;
501 let mut best_score = f64::NEG_INFINITY;
502 for p in 0..k {
503 if self.part_sizes[p] >= cap {
504 continue;
505 }
506 let nbrs_in_p: f64 = neighbors
507 .iter()
508 .filter(|&&(nb, _)| {
509 nb < self.partition.len() && self.partition[nb] == Some(p)
510 })
511 .map(|&(_, w)| w)
512 .sum();
513 let load = self.part_sizes[p] as f64 / cap as f64;
514 let score = nbrs_in_p * (1.0 - load);
515 if score > best_score
516 || (score == best_score && self.part_sizes[p] < self.part_sizes[best])
517 {
518 best_score = score;
519 best = p;
520 }
521 }
522 best
523 }
524 StreamingPartitionAlgorithm::Fennel => {
525 let gamma = self.config.gamma;
527 let alpha = if self.config.alpha <= 0.0 {
528 (k as f64).sqrt() / (n_total as f64).max(1.0)
530 } else {
531 self.config.alpha
532 };
533
534 let mut best = 0usize;
535 let mut best_score = f64::NEG_INFINITY;
536 for p in 0..k {
537 if self.part_sizes[p] >= cap {
538 continue;
539 }
540 let nbrs_in_p: f64 = neighbors
541 .iter()
542 .filter(|&&(nb, _)| {
543 nb < self.partition.len() && self.partition[nb] == Some(p)
544 })
545 .map(|&(_, w)| w)
546 .sum();
547 let penalty = gamma * (self.part_sizes[p] as f64).powf(alpha);
548 let score = nbrs_in_p - penalty;
549 if score > best_score
550 || (score == best_score && self.part_sizes[p] < self.part_sizes[best])
551 {
552 best_score = score;
553 best = p;
554 }
555 }
556 best
557 }
558 };
559
560 self.partition[v] = Some(best_p);
561 self.part_sizes[best_p] += 1;
562 self.n_assigned += 1;
563 best_p
564 }
565
566 pub fn current_partition(&self) -> &[Option<usize>] {
568 &self.partition
569 }
570
571 pub fn edge_cut_estimate(&self, adj: &[Vec<(usize, f64)>]) -> usize {
576 let mut cut = 0usize;
577 for (i, nbrs) in adj.iter().enumerate() {
578 let pi = match self.partition.get(i).copied().flatten() {
579 Some(p) => p,
580 None => continue,
581 };
582 for &(j, _) in nbrs {
583 if j <= i {
584 continue; }
586 let pj = match self.partition.get(j).copied().flatten() {
587 Some(p) => p,
588 None => continue,
589 };
590 if pi != pj {
591 cut += 1;
592 }
593 }
594 }
595 cut
596 }
597}
598
599#[cfg(test)]
600mod streaming_partitioner_tests {
601 use super::*;
602
603 fn build_path_adj(n: usize) -> Vec<Vec<(usize, f64)>> {
604 let mut adj = vec![vec![]; n];
605 for i in 0..(n - 1) {
606 adj[i].push((i + 1, 1.0));
607 adj[i + 1].push((i, 1.0));
608 }
609 adj
610 }
611
612 #[test]
613 fn test_streaming_fennel_assignment() {
614 let n = 20;
615 let adj = build_path_adj(n);
616 let config = StreamingPartitionConfig {
617 n_parts: 4,
618 algorithm: StreamingPartitionAlgorithm::Fennel,
619 ..StreamingPartitionConfig::default()
620 };
621 let mut sp = StreamingPartitioner::new(n, config);
622
623 for i in 0..n {
624 let nbrs: Vec<(usize, f64)> = adj[i].clone();
625 let p = sp.assign_vertex(i, &nbrs);
626 assert!(p < 4, "part {} out of range", p);
627 }
628
629 for opt in sp.current_partition() {
631 assert!(opt.is_some(), "node should be assigned");
632 }
633 }
634
635 #[test]
636 fn test_streaming_hashing_uniform() {
637 let n = 100;
638 let config = StreamingPartitionConfig {
639 n_parts: 4,
640 algorithm: StreamingPartitionAlgorithm::Hashing,
641 ..StreamingPartitionConfig::default()
642 };
643 let mut sp = StreamingPartitioner::new(n, config);
644 for i in 0..n {
645 sp.assign_vertex(i, &[]);
646 }
647 for &s in &sp.part_sizes {
649 assert_eq!(s, 25, "hash partition should be uniform");
650 }
651 }
652
653 #[test]
654 fn test_streaming_ldg_assigns_all() {
655 let n = 30;
656 let adj = build_path_adj(n);
657 let config = StreamingPartitionConfig {
658 n_parts: 3,
659 algorithm: StreamingPartitionAlgorithm::LinearDeterministic,
660 ..StreamingPartitionConfig::default()
661 };
662 let mut sp = StreamingPartitioner::new(n, config);
663 for i in 0..n {
664 let nbrs = adj[i].clone();
665 let p = sp.assign_vertex(i, &nbrs);
666 assert!(p < 3);
667 }
668 let total: usize = sp.part_sizes.iter().sum();
669 assert_eq!(total, n);
670 }
671
672 #[test]
673 fn test_streaming_edge_cut_estimate() {
674 let n = 10;
675 let adj = build_path_adj(n);
676 let config = StreamingPartitionConfig {
677 n_parts: 2,
678 algorithm: StreamingPartitionAlgorithm::Fennel,
679 ..StreamingPartitionConfig::default()
680 };
681 let mut sp = StreamingPartitioner::new(n, config);
682 for i in 0..n {
683 let nbrs = adj[i].clone();
684 sp.assign_vertex(i, &nbrs);
685 }
686 let cut = sp.edge_cut_estimate(&adj);
687 assert!(cut <= n, "edge cut {} should be <= n={}", cut, n);
689 }
690}