use scirs2_core::ndarray::Array2;
use crate::error::{GraphError, Result};
use super::types::{PartitionConfig, PartitionResult};
pub fn streaming_partition(
edges: &[(usize, usize)],
n_nodes: usize,
config: &PartitionConfig,
) -> Result<PartitionResult> {
let k = config.n_partitions;
if k < 2 {
return Err(GraphError::InvalidParameter {
param: "n_partitions".to_string(),
value: format!("{}", k),
expected: "at least 2".to_string(),
context: "streaming_partition".to_string(),
});
}
if n_nodes < 2 {
return Err(GraphError::InvalidParameter {
param: "n_nodes".to_string(),
value: format!("{}", n_nodes),
expected: "at least 2".to_string(),
context: "streaming_partition".to_string(),
});
}
if k > n_nodes {
return Err(GraphError::InvalidParameter {
param: "n_partitions".to_string(),
value: format!("{}", k),
expected: format!("at most {} (number of nodes)", n_nodes),
context: "streaming_partition".to_string(),
});
}
let mut adj_list: Vec<Vec<usize>> = vec![Vec::new(); n_nodes];
for &(u, v) in edges {
if u < n_nodes && v < n_nodes {
adj_list[u].push(v);
}
}
let capacity =
((n_nodes as f64) * (1.0 + config.balance_tolerance) / (k as f64)).ceil() as usize;
let mut assignments = vec![usize::MAX; n_nodes];
let mut partition_sizes = vec![0usize; k];
for node in 0..n_nodes {
let mut best_partition = 0usize;
let mut best_score = f64::NEG_INFINITY;
for p in 0..k {
if partition_sizes[p] >= capacity {
continue;
}
let neighbors_in_p = adj_list[node]
.iter()
.filter(|&&nbr| nbr < node && assignments[nbr] == p)
.count();
let load_factor = partition_sizes[p] as f64 / capacity as f64;
let score = (neighbors_in_p as f64) * (1.0 - load_factor);
if score > best_score
|| (score == best_score && partition_sizes[p] < partition_sizes[best_partition])
{
best_score = score;
best_partition = p;
}
}
assignments[node] = best_partition;
partition_sizes[best_partition] += 1;
}
let mut edge_cut = 0usize;
let mut seen_edges = std::collections::HashSet::new();
for &(u, v) in edges {
if u < n_nodes && v < n_nodes && u != v {
let key = if u < v { (u, v) } else { (v, u) };
if seen_edges.insert(key) && assignments[u] != assignments[v] {
edge_cut += 1;
}
}
}
let ideal = n_nodes as f64 / k as f64;
let imbalance = if ideal > 0.0 {
partition_sizes
.iter()
.map(|&s| ((s as f64) - ideal).abs() / ideal)
.fold(0.0f64, f64::max)
} else {
0.0
};
Ok(PartitionResult {
assignments,
edge_cut,
partition_sizes,
imbalance,
})
}
pub fn hash_partition(n_nodes: usize, n_partitions: usize) -> PartitionResult {
let mut assignments = vec![0usize; n_nodes];
let mut partition_sizes = vec![0usize; n_partitions];
for i in 0..n_nodes {
let p = i % n_partitions;
assignments[i] = p;
partition_sizes[p] += 1;
}
let ideal = n_nodes as f64 / n_partitions as f64;
let imbalance = if ideal > 0.0 {
partition_sizes
.iter()
.map(|&s| ((s as f64) - ideal).abs() / ideal)
.fold(0.0f64, f64::max)
} else {
0.0
};
PartitionResult {
assignments,
edge_cut: 0, partition_sizes,
imbalance,
}
}
pub fn evaluate_partition(
adj: &Array2<f64>,
assignments: &[usize],
n_partitions: usize,
) -> (usize, f64) {
let n = adj.nrows().min(assignments.len());
let mut partition_sizes = vec![0usize; n_partitions];
for &a in &assignments[..n] {
if a < n_partitions {
partition_sizes[a] += 1;
}
}
let mut edge_cut = 0usize;
for i in 0..n {
for j in (i + 1)..n {
if adj[[i, j]].abs() > 1e-15 && assignments[i] != assignments[j] {
edge_cut += 1;
}
}
}
let ideal = n as f64 / n_partitions as f64;
let imbalance = if ideal > 0.0 {
partition_sizes
.iter()
.map(|&s| ((s as f64) - ideal).abs() / ideal)
.fold(0.0f64, f64::max)
} else {
0.0
};
(edge_cut, imbalance)
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::Array2;
fn two_cliques_edges(n: usize) -> (Vec<(usize, usize)>, usize) {
let size = 2 * n;
let mut edges = Vec::new();
for i in 0..n {
for j in (i + 1)..n {
edges.push((i, j));
edges.push((j, i));
}
}
for i in n..size {
for j in (i + 1)..size {
edges.push((i, j));
edges.push((j, i));
}
}
edges.push((n - 1, n));
edges.push((n, n - 1));
(edges, size)
}
#[test]
fn test_ldg_better_than_hash_on_structured() {
let (edges, n_nodes) = two_cliques_edges(6);
let config = PartitionConfig {
n_partitions: 2,
balance_tolerance: 0.1,
..PartitionConfig::default()
};
let ldg_result = streaming_partition(&edges, n_nodes, &config).expect("LDG should succeed");
let mut adj = Array2::<f64>::zeros((n_nodes, n_nodes));
for &(u, v) in &edges {
adj[[u, v]] = 1.0;
}
let hash_result = hash_partition(n_nodes, 2);
let (hash_cut, _) = evaluate_partition(&adj, &hash_result.assignments, 2);
assert!(
ldg_result.edge_cut <= hash_cut + 2,
"LDG edge cut ({}) should be competitive with hash ({})",
ldg_result.edge_cut,
hash_cut
);
}
#[test]
fn test_hash_uniform_sizes() {
let n_nodes = 100;
let k = 4;
let result = hash_partition(n_nodes, k);
assert_eq!(result.partition_sizes.len(), k);
for &s in &result.partition_sizes {
assert_eq!(s, 25);
}
assert!(result.imbalance < 1e-10);
}
#[test]
fn test_hash_near_uniform_sizes() {
let n_nodes = 10;
let k = 3;
let result = hash_partition(n_nodes, k);
assert_eq!(result.partition_sizes.len(), k);
let total: usize = result.partition_sizes.iter().sum();
assert_eq!(total, n_nodes);
for &s in &result.partition_sizes {
assert!((3..=4).contains(&s));
}
}
#[test]
fn test_evaluate_partition() {
let n = 4;
let mut adj = Array2::<f64>::zeros((n, n));
adj[[0, 1]] = 1.0;
adj[[1, 0]] = 1.0;
adj[[2, 3]] = 1.0;
adj[[3, 2]] = 1.0;
adj[[1, 2]] = 1.0;
adj[[2, 1]] = 1.0;
let assignments = vec![0, 0, 1, 1];
let (cut, imbalance) = evaluate_partition(&adj, &assignments, 2);
assert_eq!(cut, 1); assert!(imbalance < 1e-10);
}
#[test]
fn test_single_node_trivial() {
let edges = vec![(0, 1), (1, 0)];
let config = PartitionConfig {
n_partitions: 2,
balance_tolerance: 0.5,
..PartitionConfig::default()
};
let result = streaming_partition(&edges, 2, &config).expect("should succeed");
assert_eq!(result.assignments.len(), 2);
assert!(result.assignments[0] < 2);
assert!(result.assignments[1] < 2);
let total: usize = result.partition_sizes.iter().sum();
assert_eq!(total, 2);
}
#[test]
fn test_streaming_invalid_params() {
let config = PartitionConfig {
n_partitions: 1,
..PartitionConfig::default()
};
assert!(streaming_partition(&[], 10, &config).is_err());
let config2 = PartitionConfig {
n_partitions: 5,
..PartitionConfig::default()
};
assert!(streaming_partition(&[], 3, &config2).is_err());
}
#[test]
fn test_edge_cut_computable() {
let (edges, n_nodes) = two_cliques_edges(4);
let mut adj = Array2::<f64>::zeros((n_nodes, n_nodes));
for &(u, v) in &edges {
adj[[u, v]] = 1.0;
}
let config = PartitionConfig {
n_partitions: 2,
balance_tolerance: 0.2,
..PartitionConfig::default()
};
let result = streaming_partition(&edges, n_nodes, &config).expect("should succeed");
let (eval_cut, _) = evaluate_partition(&adj, &result.assignments, 2);
assert_eq!(result.edge_cut, eval_cut);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum StreamingPartitionAlgorithm {
Fennel,
Hashing,
LinearDeterministic,
}
#[derive(Debug, Clone)]
pub struct StreamingPartitionConfig {
pub n_parts: usize,
pub algorithm: StreamingPartitionAlgorithm,
pub gamma: f64,
pub alpha: f64,
}
impl Default for StreamingPartitionConfig {
fn default() -> Self {
Self {
n_parts: 2,
algorithm: StreamingPartitionAlgorithm::Fennel,
gamma: 1.5,
alpha: 1.5,
}
}
}
pub struct StreamingPartitioner {
config: StreamingPartitionConfig,
partition: Vec<Option<usize>>,
part_sizes: Vec<usize>,
n_assigned: usize,
}
impl StreamingPartitioner {
pub fn new(n_nodes: usize, config: StreamingPartitionConfig) -> Self {
let k = config.n_parts;
Self {
config,
partition: vec![None; n_nodes],
part_sizes: vec![0usize; k],
n_assigned: 0,
}
}
pub fn assign_vertex(&mut self, v: usize, neighbors: &[(usize, f64)]) -> usize {
if v >= self.partition.len() {
self.partition.resize(v + 1, None);
}
if let Some(p) = self.partition[v] {
return p;
}
let k = self.config.n_parts;
let n_total = self.partition.len().max(1);
let cap = ((n_total as f64 * 1.05) / k as f64).ceil() as usize;
let best_p = match self.config.algorithm {
StreamingPartitionAlgorithm::Hashing => v % k,
StreamingPartitionAlgorithm::LinearDeterministic => {
let mut best = 0usize;
let mut best_score = f64::NEG_INFINITY;
for p in 0..k {
if self.part_sizes[p] >= cap {
continue;
}
let nbrs_in_p: f64 = neighbors
.iter()
.filter(|&&(nb, _)| {
nb < self.partition.len() && self.partition[nb] == Some(p)
})
.map(|&(_, w)| w)
.sum();
let load = self.part_sizes[p] as f64 / cap as f64;
let score = nbrs_in_p * (1.0 - load);
if score > best_score
|| (score == best_score && self.part_sizes[p] < self.part_sizes[best])
{
best_score = score;
best = p;
}
}
best
}
StreamingPartitionAlgorithm::Fennel => {
let gamma = self.config.gamma;
let alpha = if self.config.alpha <= 0.0 {
(k as f64).sqrt() / (n_total as f64).max(1.0)
} else {
self.config.alpha
};
let mut best = 0usize;
let mut best_score = f64::NEG_INFINITY;
for p in 0..k {
if self.part_sizes[p] >= cap {
continue;
}
let nbrs_in_p: f64 = neighbors
.iter()
.filter(|&&(nb, _)| {
nb < self.partition.len() && self.partition[nb] == Some(p)
})
.map(|&(_, w)| w)
.sum();
let penalty = gamma * (self.part_sizes[p] as f64).powf(alpha);
let score = nbrs_in_p - penalty;
if score > best_score
|| (score == best_score && self.part_sizes[p] < self.part_sizes[best])
{
best_score = score;
best = p;
}
}
best
}
};
self.partition[v] = Some(best_p);
self.part_sizes[best_p] += 1;
self.n_assigned += 1;
best_p
}
pub fn current_partition(&self) -> &[Option<usize>] {
&self.partition
}
pub fn edge_cut_estimate(&self, adj: &[Vec<(usize, f64)>]) -> usize {
let mut cut = 0usize;
for (i, nbrs) in adj.iter().enumerate() {
let pi = match self.partition.get(i).copied().flatten() {
Some(p) => p,
None => continue,
};
for &(j, _) in nbrs {
if j <= i {
continue; }
let pj = match self.partition.get(j).copied().flatten() {
Some(p) => p,
None => continue,
};
if pi != pj {
cut += 1;
}
}
}
cut
}
}
#[cfg(test)]
mod streaming_partitioner_tests {
use super::*;
fn build_path_adj(n: usize) -> Vec<Vec<(usize, f64)>> {
let mut adj = vec![vec![]; n];
for i in 0..(n - 1) {
adj[i].push((i + 1, 1.0));
adj[i + 1].push((i, 1.0));
}
adj
}
#[test]
fn test_streaming_fennel_assignment() {
let n = 20;
let adj = build_path_adj(n);
let config = StreamingPartitionConfig {
n_parts: 4,
algorithm: StreamingPartitionAlgorithm::Fennel,
..StreamingPartitionConfig::default()
};
let mut sp = StreamingPartitioner::new(n, config);
for i in 0..n {
let nbrs: Vec<(usize, f64)> = adj[i].clone();
let p = sp.assign_vertex(i, &nbrs);
assert!(p < 4, "part {} out of range", p);
}
for opt in sp.current_partition() {
assert!(opt.is_some(), "node should be assigned");
}
}
#[test]
fn test_streaming_hashing_uniform() {
let n = 100;
let config = StreamingPartitionConfig {
n_parts: 4,
algorithm: StreamingPartitionAlgorithm::Hashing,
..StreamingPartitionConfig::default()
};
let mut sp = StreamingPartitioner::new(n, config);
for i in 0..n {
sp.assign_vertex(i, &[]);
}
for &s in &sp.part_sizes {
assert_eq!(s, 25, "hash partition should be uniform");
}
}
#[test]
fn test_streaming_ldg_assigns_all() {
let n = 30;
let adj = build_path_adj(n);
let config = StreamingPartitionConfig {
n_parts: 3,
algorithm: StreamingPartitionAlgorithm::LinearDeterministic,
..StreamingPartitionConfig::default()
};
let mut sp = StreamingPartitioner::new(n, config);
for i in 0..n {
let nbrs = adj[i].clone();
let p = sp.assign_vertex(i, &nbrs);
assert!(p < 3);
}
let total: usize = sp.part_sizes.iter().sum();
assert_eq!(total, n);
}
#[test]
fn test_streaming_edge_cut_estimate() {
let n = 10;
let adj = build_path_adj(n);
let config = StreamingPartitionConfig {
n_parts: 2,
algorithm: StreamingPartitionAlgorithm::Fennel,
..StreamingPartitionConfig::default()
};
let mut sp = StreamingPartitioner::new(n, config);
for i in 0..n {
let nbrs = adj[i].clone();
sp.assign_vertex(i, &nbrs);
}
let cut = sp.edge_cut_estimate(&adj);
assert!(cut <= n, "edge cut {} should be <= n={}", cut, n);
}
}