Skip to main content

scirs2_graph/partitioning/
streaming.rs

1//! Streaming graph partitioning for large-scale graphs.
2//!
3//! When a graph is too large to fit in memory or arrives as a stream of
4//! edges, streaming partitioners assign nodes to partitions in a single
5//! pass. This module provides:
6//!
7//! - **LDG (Linear Deterministic Greedy)**: assigns each arriving vertex
8//!   to the partition that maximizes the number of neighbors already present,
9//!   penalized by partition size to maintain balance.
10//! - **Hash partitioning**: simple baseline that assigns nodes by hash.
11
12use scirs2_core::ndarray::Array2;
13
14use crate::error::{GraphError, Result};
15
16use super::types::{PartitionConfig, PartitionResult};
17
18/// Linear Deterministic Greedy (LDG) streaming partitioner.
19///
20/// Processes nodes in order 0..n_nodes. For each node, it examines its
21/// edges (among those already seen) and assigns the node to the partition
22/// that maximizes:
23///
24///   score(p) = neighbors_in_p * (1 - |p| / capacity)
25///
26/// where capacity = ceil(n_nodes * (1 + balance_tolerance) / n_partitions).
27///
28/// # Arguments
29/// * `edges` - Edge list as (src, dst) pairs. Both directions should be
30///   included for undirected graphs.
31/// * `n_nodes` - Total number of nodes (nodes are indexed 0..n_nodes).
32/// * `config` - Partition configuration.
33///
34/// # Returns
35/// A `PartitionResult` with partition assignments for all nodes.
36///
37/// # Errors
38/// Returns `GraphError::InvalidParameter` if parameters are invalid.
39pub fn streaming_partition(
40    edges: &[(usize, usize)],
41    n_nodes: usize,
42    config: &PartitionConfig,
43) -> Result<PartitionResult> {
44    let k = config.n_partitions;
45
46    if k < 2 {
47        return Err(GraphError::InvalidParameter {
48            param: "n_partitions".to_string(),
49            value: format!("{}", k),
50            expected: "at least 2".to_string(),
51            context: "streaming_partition".to_string(),
52        });
53    }
54
55    if n_nodes < 2 {
56        return Err(GraphError::InvalidParameter {
57            param: "n_nodes".to_string(),
58            value: format!("{}", n_nodes),
59            expected: "at least 2".to_string(),
60            context: "streaming_partition".to_string(),
61        });
62    }
63
64    if k > n_nodes {
65        return Err(GraphError::InvalidParameter {
66            param: "n_partitions".to_string(),
67            value: format!("{}", k),
68            expected: format!("at most {} (number of nodes)", n_nodes),
69            context: "streaming_partition".to_string(),
70        });
71    }
72
73    // Build adjacency list from edge list
74    let mut adj_list: Vec<Vec<usize>> = vec![Vec::new(); n_nodes];
75    for &(u, v) in edges {
76        if u < n_nodes && v < n_nodes {
77            adj_list[u].push(v);
78        }
79    }
80
81    // Capacity per partition (with tolerance)
82    let capacity =
83        ((n_nodes as f64) * (1.0 + config.balance_tolerance) / (k as f64)).ceil() as usize;
84
85    let mut assignments = vec![usize::MAX; n_nodes];
86    let mut partition_sizes = vec![0usize; k];
87
88    // Process nodes in order
89    for node in 0..n_nodes {
90        let mut best_partition = 0usize;
91        let mut best_score = f64::NEG_INFINITY;
92
93        for p in 0..k {
94            if partition_sizes[p] >= capacity {
95                continue;
96            }
97
98            // Count neighbors already in partition p
99            let neighbors_in_p = adj_list[node]
100                .iter()
101                .filter(|&&nbr| nbr < node && assignments[nbr] == p)
102                .count();
103
104            // LDG scoring: neighbors * (1 - load_factor)
105            let load_factor = partition_sizes[p] as f64 / capacity as f64;
106            let score = (neighbors_in_p as f64) * (1.0 - load_factor);
107
108            // Tie-breaking: prefer less-loaded partition
109            if score > best_score
110                || (score == best_score && partition_sizes[p] < partition_sizes[best_partition])
111            {
112                best_score = score;
113                best_partition = p;
114            }
115        }
116
117        assignments[node] = best_partition;
118        partition_sizes[best_partition] += 1;
119    }
120
121    // Compute edge cut
122    let mut edge_cut = 0usize;
123    let mut seen_edges = std::collections::HashSet::new();
124    for &(u, v) in edges {
125        if u < n_nodes && v < n_nodes && u != v {
126            let key = if u < v { (u, v) } else { (v, u) };
127            if seen_edges.insert(key) && assignments[u] != assignments[v] {
128                edge_cut += 1;
129            }
130        }
131    }
132
133    // Compute imbalance
134    let ideal = n_nodes as f64 / k as f64;
135    let imbalance = if ideal > 0.0 {
136        partition_sizes
137            .iter()
138            .map(|&s| ((s as f64) - ideal).abs() / ideal)
139            .fold(0.0f64, f64::max)
140    } else {
141        0.0
142    };
143
144    Ok(PartitionResult {
145        assignments,
146        edge_cut,
147        partition_sizes,
148        imbalance,
149    })
150}
151
152/// Hash-based streaming partition (baseline).
153///
154/// Assigns each node to partition `node % n_partitions`. This is the simplest
155/// possible partitioner and serves as a baseline for comparison.
156///
157/// # Arguments
158/// * `n_nodes` - Number of nodes (0-indexed).
159/// * `n_partitions` - Number of partitions.
160///
161/// # Returns
162/// A `PartitionResult` with uniform (or near-uniform) partition sizes.
163pub fn hash_partition(n_nodes: usize, n_partitions: usize) -> PartitionResult {
164    let mut assignments = vec![0usize; n_nodes];
165    let mut partition_sizes = vec![0usize; n_partitions];
166
167    for i in 0..n_nodes {
168        let p = i % n_partitions;
169        assignments[i] = p;
170        partition_sizes[p] += 1;
171    }
172
173    let ideal = n_nodes as f64 / n_partitions as f64;
174    let imbalance = if ideal > 0.0 {
175        partition_sizes
176            .iter()
177            .map(|&s| ((s as f64) - ideal).abs() / ideal)
178            .fold(0.0f64, f64::max)
179    } else {
180        0.0
181    };
182
183    PartitionResult {
184        assignments,
185        edge_cut: 0, // Not computed without adjacency info
186        partition_sizes,
187        imbalance,
188    }
189}
190
191/// Evaluate a partition against an adjacency matrix.
192///
193/// Computes the edge cut (number of edges crossing partitions) and
194/// the imbalance ratio.
195///
196/// # Arguments
197/// * `adj` - Symmetric adjacency matrix (n x n).
198/// * `assignments` - Partition assignment for each node.
199/// * `n_partitions` - Number of partitions.
200///
201/// # Returns
202/// A tuple `(edge_cut, imbalance)`.
203pub fn evaluate_partition(
204    adj: &Array2<f64>,
205    assignments: &[usize],
206    n_partitions: usize,
207) -> (usize, f64) {
208    let n = adj.nrows().min(assignments.len());
209
210    let mut partition_sizes = vec![0usize; n_partitions];
211    for &a in &assignments[..n] {
212        if a < n_partitions {
213            partition_sizes[a] += 1;
214        }
215    }
216
217    let mut edge_cut = 0usize;
218    for i in 0..n {
219        for j in (i + 1)..n {
220            if adj[[i, j]].abs() > 1e-15 && assignments[i] != assignments[j] {
221                edge_cut += 1;
222            }
223        }
224    }
225
226    let ideal = n as f64 / n_partitions as f64;
227    let imbalance = if ideal > 0.0 {
228        partition_sizes
229            .iter()
230            .map(|&s| ((s as f64) - ideal).abs() / ideal)
231            .fold(0.0f64, f64::max)
232    } else {
233        0.0
234    };
235
236    (edge_cut, imbalance)
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use scirs2_core::ndarray::Array2;
243
244    /// Build edge list for two cliques connected by a bridge.
245    fn two_cliques_edges(n: usize) -> (Vec<(usize, usize)>, usize) {
246        let size = 2 * n;
247        let mut edges = Vec::new();
248        // Clique 1
249        for i in 0..n {
250            for j in (i + 1)..n {
251                edges.push((i, j));
252                edges.push((j, i));
253            }
254        }
255        // Clique 2
256        for i in n..size {
257            for j in (i + 1)..size {
258                edges.push((i, j));
259                edges.push((j, i));
260            }
261        }
262        // Bridge
263        edges.push((n - 1, n));
264        edges.push((n, n - 1));
265        (edges, size)
266    }
267
268    #[test]
269    fn test_ldg_better_than_hash_on_structured() {
270        let (edges, n_nodes) = two_cliques_edges(6);
271        let config = PartitionConfig {
272            n_partitions: 2,
273            balance_tolerance: 0.1,
274            ..PartitionConfig::default()
275        };
276
277        let ldg_result = streaming_partition(&edges, n_nodes, &config).expect("LDG should succeed");
278
279        // Build adjacency matrix for evaluation
280        let mut adj = Array2::<f64>::zeros((n_nodes, n_nodes));
281        for &(u, v) in &edges {
282            adj[[u, v]] = 1.0;
283        }
284
285        let hash_result = hash_partition(n_nodes, 2);
286        let (hash_cut, _) = evaluate_partition(&adj, &hash_result.assignments, 2);
287
288        // LDG should achieve a lower or equal edge cut on structured graphs
289        assert!(
290            ldg_result.edge_cut <= hash_cut + 2,
291            "LDG edge cut ({}) should be competitive with hash ({})",
292            ldg_result.edge_cut,
293            hash_cut
294        );
295    }
296
297    #[test]
298    fn test_hash_uniform_sizes() {
299        let n_nodes = 100;
300        let k = 4;
301        let result = hash_partition(n_nodes, k);
302        assert_eq!(result.partition_sizes.len(), k);
303        // Each partition should have 25 nodes
304        for &s in &result.partition_sizes {
305            assert_eq!(s, 25);
306        }
307        assert!(result.imbalance < 1e-10);
308    }
309
310    #[test]
311    fn test_hash_near_uniform_sizes() {
312        let n_nodes = 10;
313        let k = 3;
314        let result = hash_partition(n_nodes, k);
315        assert_eq!(result.partition_sizes.len(), k);
316        // Sizes should be 4, 3, 3 or similar
317        let total: usize = result.partition_sizes.iter().sum();
318        assert_eq!(total, n_nodes);
319        for &s in &result.partition_sizes {
320            assert!((3..=4).contains(&s));
321        }
322    }
323
324    #[test]
325    fn test_evaluate_partition() {
326        let n = 4;
327        let mut adj = Array2::<f64>::zeros((n, n));
328        adj[[0, 1]] = 1.0;
329        adj[[1, 0]] = 1.0;
330        adj[[2, 3]] = 1.0;
331        adj[[3, 2]] = 1.0;
332        adj[[1, 2]] = 1.0;
333        adj[[2, 1]] = 1.0;
334
335        let assignments = vec![0, 0, 1, 1];
336        let (cut, imbalance) = evaluate_partition(&adj, &assignments, 2);
337        assert_eq!(cut, 1); // only edge 1-2 crosses
338        assert!(imbalance < 1e-10);
339    }
340
341    #[test]
342    fn test_single_node_trivial() {
343        // Edge case: 2 nodes, 2 partitions
344        let edges = vec![(0, 1), (1, 0)];
345        let config = PartitionConfig {
346            n_partitions: 2,
347            balance_tolerance: 0.5,
348            ..PartitionConfig::default()
349        };
350        let result = streaming_partition(&edges, 2, &config).expect("should succeed");
351        assert_eq!(result.assignments.len(), 2);
352        // Both nodes should be assigned (valid partition IDs)
353        assert!(result.assignments[0] < 2);
354        assert!(result.assignments[1] < 2);
355        // Total nodes accounted for
356        let total: usize = result.partition_sizes.iter().sum();
357        assert_eq!(total, 2);
358    }
359
360    #[test]
361    fn test_streaming_invalid_params() {
362        let config = PartitionConfig {
363            n_partitions: 1,
364            ..PartitionConfig::default()
365        };
366        assert!(streaming_partition(&[], 10, &config).is_err());
367
368        let config2 = PartitionConfig {
369            n_partitions: 5,
370            ..PartitionConfig::default()
371        };
372        assert!(streaming_partition(&[], 3, &config2).is_err());
373    }
374
375    #[test]
376    fn test_edge_cut_computable() {
377        let (edges, n_nodes) = two_cliques_edges(4);
378        let mut adj = Array2::<f64>::zeros((n_nodes, n_nodes));
379        for &(u, v) in &edges {
380            adj[[u, v]] = 1.0;
381        }
382
383        let config = PartitionConfig {
384            n_partitions: 2,
385            balance_tolerance: 0.2,
386            ..PartitionConfig::default()
387        };
388        let result = streaming_partition(&edges, n_nodes, &config).expect("should succeed");
389
390        // Verify edge cut matches evaluate_partition
391        let (eval_cut, _) = evaluate_partition(&adj, &result.assignments, 2);
392        assert_eq!(result.edge_cut, eval_cut);
393    }
394}
395
396// ============================================================================
397// Stateful streaming partitioner with FENNEL / Hashing / LDG backends
398// ============================================================================
399
400/// Algorithm variant for the stateful streaming partitioner.
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402#[non_exhaustive]
403pub enum StreamingPartitionAlgorithm {
404    /// FENNEL: minimize edge cut + imbalance penalty, vertex streaming.
405    /// Score for partition p: `|N(v) ∩ Vp| - γ · |Vp|^α`.
406    Fennel,
407    /// Consistent hashing baseline: `v % k`.
408    Hashing,
409    /// Linear Deterministic Greedy: `|N(v) ∩ Vp| · (1 - |Vp| / cap)`.
410    LinearDeterministic,
411}
412
413/// Configuration for the stateful streaming partitioner.
414#[derive(Debug, Clone)]
415pub struct StreamingPartitionConfig {
416    /// Number of target partitions. Must be >= 2.
417    pub n_parts: usize,
418    /// Algorithm variant. Default: `Fennel`.
419    pub algorithm: StreamingPartitionAlgorithm,
420    /// Balance penalty weight γ for FENNEL. Default 1.5.
421    pub gamma: f64,
422    /// FENNEL exponent α (default 1.5, higher = stronger balance enforcement).
423    /// Set to 0.0 to auto-compute from graph density.
424    pub alpha: f64,
425}
426
427impl Default for StreamingPartitionConfig {
428    fn default() -> Self {
429        Self {
430            n_parts: 2,
431            algorithm: StreamingPartitionAlgorithm::Fennel,
432            gamma: 1.5,
433            alpha: 1.5,
434        }
435    }
436}
437
438/// A stateful online streaming partitioner.
439///
440/// Nodes arrive one at a time (with their adjacency list as observed so far).
441/// Each call to [`assign_vertex`](StreamingPartitioner::assign_vertex) makes an
442/// irrevocable assignment based on already-assigned neighbors.
443///
444/// # Example
445///
446/// ```rust,no_run
447/// use scirs2_graph::partitioning::{StreamingPartitioner, StreamingPartitionConfig, StreamingPartitionAlgorithm};
448///
449/// let config = StreamingPartitionConfig { n_parts: 3, ..Default::default() };
450/// let mut sp = StreamingPartitioner::new(10, config);
451/// // Assign node 0 with no neighbors yet
452/// let part = sp.assign_vertex(0, &[]);
453/// assert!(part < 3);
454/// ```
455pub struct StreamingPartitioner {
456    config: StreamingPartitionConfig,
457    /// Partition assignment for each node; `None` means not yet assigned.
458    partition: Vec<Option<usize>>,
459    /// Number of nodes in each partition.
460    part_sizes: Vec<usize>,
461    /// Total nodes processed so far.
462    n_assigned: usize,
463}
464
465impl StreamingPartitioner {
466    /// Create a new streaming partitioner for `n_nodes` nodes.
467    pub fn new(n_nodes: usize, config: StreamingPartitionConfig) -> Self {
468        let k = config.n_parts;
469        Self {
470            config,
471            partition: vec![None; n_nodes],
472            part_sizes: vec![0usize; k],
473            n_assigned: 0,
474        }
475    }
476
477    /// Assign vertex `v` to a partition given its (already-known) neighbors.
478    ///
479    /// Only neighbors whose partition is already determined influence the score.
480    /// Returns the assigned partition ID.
481    pub fn assign_vertex(&mut self, v: usize, neighbors: &[(usize, f64)]) -> usize {
482        if v >= self.partition.len() {
483            // Grow if needed
484            self.partition.resize(v + 1, None);
485        }
486
487        // If already assigned, return current assignment
488        if let Some(p) = self.partition[v] {
489            return p;
490        }
491
492        let k = self.config.n_parts;
493        let n_total = self.partition.len().max(1);
494        // Capacity with 5% slack
495        let cap = ((n_total as f64 * 1.05) / k as f64).ceil() as usize;
496
497        let best_p = match self.config.algorithm {
498            StreamingPartitionAlgorithm::Hashing => v % k,
499            StreamingPartitionAlgorithm::LinearDeterministic => {
500                let mut best = 0usize;
501                let mut best_score = f64::NEG_INFINITY;
502                for p in 0..k {
503                    if self.part_sizes[p] >= cap {
504                        continue;
505                    }
506                    let nbrs_in_p: f64 = neighbors
507                        .iter()
508                        .filter(|&&(nb, _)| {
509                            nb < self.partition.len() && self.partition[nb] == Some(p)
510                        })
511                        .map(|&(_, w)| w)
512                        .sum();
513                    let load = self.part_sizes[p] as f64 / cap as f64;
514                    let score = nbrs_in_p * (1.0 - load);
515                    if score > best_score
516                        || (score == best_score && self.part_sizes[p] < self.part_sizes[best])
517                    {
518                        best_score = score;
519                        best = p;
520                    }
521                }
522                best
523            }
524            StreamingPartitionAlgorithm::Fennel => {
525                // FENNEL score: |N(v) ∩ Vp| - γ · |Vp|^α
526                let gamma = self.config.gamma;
527                let alpha = if self.config.alpha <= 0.0 {
528                    // Auto: sqrt(k) / n gives good defaults
529                    (k as f64).sqrt() / (n_total as f64).max(1.0)
530                } else {
531                    self.config.alpha
532                };
533
534                let mut best = 0usize;
535                let mut best_score = f64::NEG_INFINITY;
536                for p in 0..k {
537                    if self.part_sizes[p] >= cap {
538                        continue;
539                    }
540                    let nbrs_in_p: f64 = neighbors
541                        .iter()
542                        .filter(|&&(nb, _)| {
543                            nb < self.partition.len() && self.partition[nb] == Some(p)
544                        })
545                        .map(|&(_, w)| w)
546                        .sum();
547                    let penalty = gamma * (self.part_sizes[p] as f64).powf(alpha);
548                    let score = nbrs_in_p - penalty;
549                    if score > best_score
550                        || (score == best_score && self.part_sizes[p] < self.part_sizes[best])
551                    {
552                        best_score = score;
553                        best = p;
554                    }
555                }
556                best
557            }
558        };
559
560        self.partition[v] = Some(best_p);
561        self.part_sizes[best_p] += 1;
562        self.n_assigned += 1;
563        best_p
564    }
565
566    /// Return the current partition assignments (`None` = not yet assigned).
567    pub fn current_partition(&self) -> &[Option<usize>] {
568        &self.partition
569    }
570
571    /// Estimate the edge cut from the current partition state.
572    ///
573    /// Only counts edges where both endpoints are assigned and in different parts.
574    /// Counts each undirected edge once.
575    pub fn edge_cut_estimate(&self, adj: &[Vec<(usize, f64)>]) -> usize {
576        let mut cut = 0usize;
577        for (i, nbrs) in adj.iter().enumerate() {
578            let pi = match self.partition.get(i).copied().flatten() {
579                Some(p) => p,
580                None => continue,
581            };
582            for &(j, _) in nbrs {
583                if j <= i {
584                    continue; // count each edge once
585                }
586                let pj = match self.partition.get(j).copied().flatten() {
587                    Some(p) => p,
588                    None => continue,
589                };
590                if pi != pj {
591                    cut += 1;
592                }
593            }
594        }
595        cut
596    }
597}
598
599#[cfg(test)]
600mod streaming_partitioner_tests {
601    use super::*;
602
603    fn build_path_adj(n: usize) -> Vec<Vec<(usize, f64)>> {
604        let mut adj = vec![vec![]; n];
605        for i in 0..(n - 1) {
606            adj[i].push((i + 1, 1.0));
607            adj[i + 1].push((i, 1.0));
608        }
609        adj
610    }
611
612    #[test]
613    fn test_streaming_fennel_assignment() {
614        let n = 20;
615        let adj = build_path_adj(n);
616        let config = StreamingPartitionConfig {
617            n_parts: 4,
618            algorithm: StreamingPartitionAlgorithm::Fennel,
619            ..StreamingPartitionConfig::default()
620        };
621        let mut sp = StreamingPartitioner::new(n, config);
622
623        for i in 0..n {
624            let nbrs: Vec<(usize, f64)> = adj[i].clone();
625            let p = sp.assign_vertex(i, &nbrs);
626            assert!(p < 4, "part {} out of range", p);
627        }
628
629        // All nodes assigned
630        for opt in sp.current_partition() {
631            assert!(opt.is_some(), "node should be assigned");
632        }
633    }
634
635    #[test]
636    fn test_streaming_hashing_uniform() {
637        let n = 100;
638        let config = StreamingPartitionConfig {
639            n_parts: 4,
640            algorithm: StreamingPartitionAlgorithm::Hashing,
641            ..StreamingPartitionConfig::default()
642        };
643        let mut sp = StreamingPartitioner::new(n, config);
644        for i in 0..n {
645            sp.assign_vertex(i, &[]);
646        }
647        // Each part should have exactly 25 nodes
648        for &s in &sp.part_sizes {
649            assert_eq!(s, 25, "hash partition should be uniform");
650        }
651    }
652
653    #[test]
654    fn test_streaming_ldg_assigns_all() {
655        let n = 30;
656        let adj = build_path_adj(n);
657        let config = StreamingPartitionConfig {
658            n_parts: 3,
659            algorithm: StreamingPartitionAlgorithm::LinearDeterministic,
660            ..StreamingPartitionConfig::default()
661        };
662        let mut sp = StreamingPartitioner::new(n, config);
663        for i in 0..n {
664            let nbrs = adj[i].clone();
665            let p = sp.assign_vertex(i, &nbrs);
666            assert!(p < 3);
667        }
668        let total: usize = sp.part_sizes.iter().sum();
669        assert_eq!(total, n);
670    }
671
672    #[test]
673    fn test_streaming_edge_cut_estimate() {
674        let n = 10;
675        let adj = build_path_adj(n);
676        let config = StreamingPartitionConfig {
677            n_parts: 2,
678            algorithm: StreamingPartitionAlgorithm::Fennel,
679            ..StreamingPartitionConfig::default()
680        };
681        let mut sp = StreamingPartitioner::new(n, config);
682        for i in 0..n {
683            let nbrs = adj[i].clone();
684            sp.assign_vertex(i, &nbrs);
685        }
686        let cut = sp.edge_cut_estimate(&adj);
687        // For a path of 10 split into 2, cut should be small (1-3 edges)
688        assert!(cut <= n, "edge cut {} should be <= n={}", cut, n);
689    }
690}