Skip to main content

ruvector_mincut/optimization/
parallel.rs

1//! Parallel Level Updates with Work-Stealing
2//!
3//! Provides efficient parallel computation for j-tree levels:
4//! - Rayon-based parallel iteration
5//! - Work-stealing for load balancing
6//! - Lock-free result aggregation
7//! - Adaptive parallelism based on workload
8//!
9//! Target: Near-linear speedup for independent level updates
10
11use crate::graph::VertexId;
12use std::collections::{HashMap, HashSet};
13use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex, RwLock};
15
16#[cfg(feature = "rayon")]
17use rayon::prelude::*;
18
19/// Configuration for parallel level updates
20#[derive(Debug, Clone)]
21pub struct ParallelConfig {
22    /// Minimum workload to use parallelism
23    pub min_parallel_size: usize,
24    /// Number of threads (0 = auto-detect)
25    pub num_threads: usize,
26    /// Enable work-stealing
27    pub work_stealing: bool,
28    /// Chunk size for parallel iteration
29    pub chunk_size: usize,
30    /// Enable adaptive parallelism
31    pub adaptive: bool,
32}
33
34impl Default for ParallelConfig {
35    fn default() -> Self {
36        Self {
37            min_parallel_size: 100,
38            num_threads: 0, // Auto-detect
39            work_stealing: true,
40            chunk_size: 64,
41            adaptive: true,
42        }
43    }
44}
45
46/// Work item for parallel processing
47#[derive(Debug, Clone)]
48pub struct WorkItem {
49    /// Level index
50    pub level: usize,
51    /// Vertices to process
52    pub vertices: Vec<VertexId>,
53    /// Priority (lower = higher priority)
54    pub priority: u32,
55    /// Estimated work units
56    pub estimated_work: usize,
57}
58
59/// Result from parallel level update
60#[derive(Debug, Clone)]
61pub struct LevelUpdateResult {
62    /// Level index
63    pub level: usize,
64    /// Computed cut value
65    pub cut_value: f64,
66    /// Partition (vertices on one side)
67    pub partition: HashSet<VertexId>,
68    /// Time taken in microseconds
69    pub time_us: u64,
70}
71
72/// Work-stealing scheduler for parallel level processing
73pub struct WorkStealingScheduler {
74    config: ParallelConfig,
75    /// Work queue
76    work_queue: RwLock<Vec<WorkItem>>,
77    /// Completed results
78    results: RwLock<HashMap<usize, LevelUpdateResult>>,
79    /// Active workers count
80    active_workers: AtomicUsize,
81    /// Total work processed
82    total_work: AtomicU64,
83    /// Steal count
84    steals: AtomicU64,
85}
86
87impl WorkStealingScheduler {
88    /// Create new scheduler with default config
89    pub fn new() -> Self {
90        Self::with_config(ParallelConfig::default())
91    }
92
93    /// Create with custom config
94    pub fn with_config(config: ParallelConfig) -> Self {
95        Self {
96            config,
97            work_queue: RwLock::new(Vec::new()),
98            results: RwLock::new(HashMap::new()),
99            active_workers: AtomicUsize::new(0),
100            total_work: AtomicU64::new(0),
101            steals: AtomicU64::new(0),
102        }
103    }
104
105    /// Submit work item
106    pub fn submit(&self, item: WorkItem) {
107        let mut queue = self.work_queue.write().unwrap();
108        let estimated_work = item.estimated_work;
109        queue.push(item);
110
111        // Sort by priority (ascending)
112        queue.sort_by_key(|w| w.priority);
113
114        self.total_work
115            .fetch_add(estimated_work as u64, Ordering::Relaxed);
116    }
117
118    /// Submit multiple work items
119    pub fn submit_batch(&self, items: Vec<WorkItem>) {
120        let mut queue = self.work_queue.write().unwrap();
121
122        for item in items {
123            self.total_work
124                .fetch_add(item.estimated_work as u64, Ordering::Relaxed);
125            queue.push(item);
126        }
127
128        // Sort by priority (ascending)
129        queue.sort_by_key(|w| w.priority);
130    }
131
132    /// Try to steal work from queue
133    pub fn steal(&self) -> Option<WorkItem> {
134        let mut queue = self.work_queue.write().unwrap();
135
136        if queue.is_empty() {
137            return None;
138        }
139
140        self.steals.fetch_add(1, Ordering::Relaxed);
141
142        // Steal from front (highest priority)
143        Some(queue.remove(0))
144    }
145
146    /// Record result
147    pub fn complete(&self, result: LevelUpdateResult) {
148        let mut results = self.results.write().unwrap();
149        results.insert(result.level, result);
150    }
151
152    /// Get all results
153    pub fn get_results(&self) -> HashMap<usize, LevelUpdateResult> {
154        self.results.read().unwrap().clone()
155    }
156
157    /// Clear results
158    pub fn clear_results(&self) {
159        self.results.write().unwrap().clear();
160    }
161
162    /// Check if queue is empty
163    pub fn is_empty(&self) -> bool {
164        self.work_queue.read().unwrap().is_empty()
165    }
166
167    /// Get queue size
168    pub fn queue_size(&self) -> usize {
169        self.work_queue.read().unwrap().len()
170    }
171
172    /// Get total steals
173    pub fn steal_count(&self) -> u64 {
174        self.steals.load(Ordering::Relaxed)
175    }
176}
177
178impl Default for WorkStealingScheduler {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184/// Parallel level updater using Rayon
185pub struct ParallelLevelUpdater {
186    config: ParallelConfig,
187    /// Scheduler for work-stealing
188    scheduler: Arc<WorkStealingScheduler>,
189    /// Global minimum cut found
190    global_min: AtomicU64,
191    /// Level with global minimum
192    best_level: AtomicUsize,
193}
194
195impl ParallelLevelUpdater {
196    /// Create new parallel updater with default config
197    pub fn new() -> Self {
198        Self::with_config(ParallelConfig::default())
199    }
200
201    /// Create with custom config
202    pub fn with_config(config: ParallelConfig) -> Self {
203        Self {
204            scheduler: Arc::new(WorkStealingScheduler::with_config(config.clone())),
205            config,
206            global_min: AtomicU64::new(f64::INFINITY.to_bits()),
207            best_level: AtomicUsize::new(usize::MAX),
208        }
209    }
210
211    /// Update global minimum atomically
212    pub fn try_update_min(&self, value: f64, level: usize) -> bool {
213        let value_bits = value.to_bits();
214        let mut current = self.global_min.load(Ordering::Acquire);
215
216        loop {
217            let current_value = f64::from_bits(current);
218            if value >= current_value {
219                return false;
220            }
221
222            match self.global_min.compare_exchange_weak(
223                current,
224                value_bits,
225                Ordering::AcqRel,
226                Ordering::Acquire,
227            ) {
228                Ok(_) => {
229                    self.best_level.store(level, Ordering::Release);
230                    return true;
231                }
232                Err(c) => current = c,
233            }
234        }
235    }
236
237    /// Get current global minimum
238    pub fn global_min(&self) -> f64 {
239        f64::from_bits(self.global_min.load(Ordering::Acquire))
240    }
241
242    /// Get best level
243    pub fn best_level(&self) -> Option<usize> {
244        let level = self.best_level.load(Ordering::Acquire);
245        if level == usize::MAX {
246            None
247        } else {
248            Some(level)
249        }
250    }
251
252    /// Reset global minimum
253    pub fn reset_min(&self) {
254        self.global_min
255            .store(f64::INFINITY.to_bits(), Ordering::Release);
256        self.best_level.store(usize::MAX, Ordering::Release);
257    }
258
259    /// Process levels in parallel using Rayon
260    #[cfg(feature = "rayon")]
261    pub fn process_parallel<F>(&self, levels: &[usize], mut process_fn: F) -> Vec<LevelUpdateResult>
262    where
263        F: FnMut(usize) -> LevelUpdateResult + Send + Sync + Clone,
264    {
265        let size = levels.len();
266
267        if size < self.config.min_parallel_size {
268            // Sequential processing for small workloads
269            return levels
270                .iter()
271                .map(|&level| {
272                    let result = process_fn.clone()(level);
273                    self.try_update_min(result.cut_value, level);
274                    result
275                })
276                .collect();
277        }
278
279        // Parallel processing with Rayon
280        levels
281            .par_iter()
282            .map(|&level| {
283                let result = process_fn.clone()(level);
284                self.try_update_min(result.cut_value, level);
285                result
286            })
287            .collect()
288    }
289
290    /// Process levels in parallel (scalar fallback)
291    #[cfg(not(feature = "rayon"))]
292    pub fn process_parallel<F>(&self, levels: &[usize], mut process_fn: F) -> Vec<LevelUpdateResult>
293    where
294        F: FnMut(usize) -> LevelUpdateResult + Clone,
295    {
296        levels
297            .iter()
298            .map(|&level| {
299                let result = process_fn.clone()(level);
300                self.try_update_min(result.cut_value, level);
301                result
302            })
303            .collect()
304    }
305
306    /// Process work items with work-stealing
307    #[cfg(feature = "rayon")]
308    pub fn process_with_stealing<F>(
309        &self,
310        work_items: Vec<WorkItem>,
311        process_fn: F,
312    ) -> Vec<LevelUpdateResult>
313    where
314        F: Fn(&WorkItem) -> LevelUpdateResult + Send + Sync,
315    {
316        if work_items.len() < self.config.min_parallel_size {
317            // Sequential
318            return work_items
319                .iter()
320                .map(|item| {
321                    let result = process_fn(item);
322                    self.try_update_min(result.cut_value, item.level);
323                    result
324                })
325                .collect();
326        }
327
328        // Parallel with work-stealing
329        work_items
330            .par_iter()
331            .map(|item| {
332                let result = process_fn(item);
333                self.try_update_min(result.cut_value, item.level);
334                result
335            })
336            .collect()
337    }
338
339    /// Process work items (scalar fallback)
340    #[cfg(not(feature = "rayon"))]
341    pub fn process_with_stealing<F>(
342        &self,
343        work_items: Vec<WorkItem>,
344        process_fn: F,
345    ) -> Vec<LevelUpdateResult>
346    where
347        F: Fn(&WorkItem) -> LevelUpdateResult,
348    {
349        work_items
350            .iter()
351            .map(|item| {
352                let result = process_fn(item);
353                self.try_update_min(result.cut_value, item.level);
354                result
355            })
356            .collect()
357    }
358
359    /// Batch vertex processing within a level
360    #[cfg(feature = "rayon")]
361    pub fn process_vertices_parallel<F, R>(&self, vertices: &[VertexId], process_fn: F) -> Vec<R>
362    where
363        F: Fn(VertexId) -> R + Send + Sync,
364        R: Send,
365    {
366        if vertices.len() < self.config.min_parallel_size {
367            return vertices.iter().map(|&v| process_fn(v)).collect();
368        }
369
370        vertices.par_iter().map(|&v| process_fn(v)).collect()
371    }
372
373    /// Batch vertex processing (scalar fallback)
374    #[cfg(not(feature = "rayon"))]
375    pub fn process_vertices_parallel<F, R>(&self, vertices: &[VertexId], process_fn: F) -> Vec<R>
376    where
377        F: Fn(VertexId) -> R,
378    {
379        vertices.iter().map(|&v| process_fn(v)).collect()
380    }
381
382    /// Parallel reduction for aggregating results
383    #[cfg(feature = "rayon")]
384    pub fn parallel_reduce<T, F, R>(
385        &self,
386        items: &[T],
387        identity: R,
388        map_fn: F,
389        reduce_fn: fn(R, R) -> R,
390    ) -> R
391    where
392        T: Sync,
393        F: Fn(&T) -> R + Send + Sync,
394        R: Send + Clone,
395    {
396        if items.len() < self.config.min_parallel_size {
397            return items
398                .iter()
399                .map(|item| map_fn(item))
400                .fold(identity.clone(), reduce_fn);
401        }
402
403        items
404            .par_iter()
405            .map(|item| map_fn(item))
406            .reduce(|| identity.clone(), reduce_fn)
407    }
408
409    /// Parallel reduction (scalar fallback)
410    #[cfg(not(feature = "rayon"))]
411    pub fn parallel_reduce<T, F, R>(
412        &self,
413        items: &[T],
414        identity: R,
415        map_fn: F,
416        reduce_fn: fn(R, R) -> R,
417    ) -> R
418    where
419        F: Fn(&T) -> R,
420        R: Clone,
421    {
422        items
423            .iter()
424            .map(|item| map_fn(item))
425            .fold(identity, reduce_fn)
426    }
427
428    /// Get scheduler reference
429    pub fn scheduler(&self) -> &Arc<WorkStealingScheduler> {
430        &self.scheduler
431    }
432}
433
434impl Default for ParallelLevelUpdater {
435    fn default() -> Self {
436        Self::new()
437    }
438}
439
440/// Parallel cut computation helpers
441pub struct ParallelCutOps;
442
443impl ParallelCutOps {
444    /// Compute boundary size in parallel
445    #[cfg(feature = "rayon")]
446    pub fn boundary_size_parallel(
447        partition: &HashSet<VertexId>,
448        adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
449    ) -> f64 {
450        let partition_vec: Vec<_> = partition.iter().copied().collect();
451
452        if partition_vec.len() < 100 {
453            return Self::boundary_size_sequential(partition, adjacency);
454        }
455
456        partition_vec
457            .par_iter()
458            .map(|&v| {
459                adjacency
460                    .get(&v)
461                    .map(|neighbors| {
462                        neighbors
463                            .iter()
464                            .filter(|(n, _)| !partition.contains(n))
465                            .map(|(_, w)| w)
466                            .sum::<f64>()
467                    })
468                    .unwrap_or(0.0)
469            })
470            .sum()
471    }
472
473    /// Compute boundary size sequentially
474    #[cfg(not(feature = "rayon"))]
475    pub fn boundary_size_parallel(
476        partition: &HashSet<VertexId>,
477        adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
478    ) -> f64 {
479        Self::boundary_size_sequential(partition, adjacency)
480    }
481
482    /// Sequential boundary computation
483    pub fn boundary_size_sequential(
484        partition: &HashSet<VertexId>,
485        adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
486    ) -> f64 {
487        partition
488            .iter()
489            .map(|&v| {
490                adjacency
491                    .get(&v)
492                    .map(|neighbors| {
493                        neighbors
494                            .iter()
495                            .filter(|(n, _)| !partition.contains(n))
496                            .map(|(_, w)| w)
497                            .sum::<f64>()
498                    })
499                    .unwrap_or(0.0)
500            })
501            .sum()
502    }
503
504    /// Find minimum degree vertex in parallel
505    #[cfg(feature = "rayon")]
506    pub fn min_degree_vertex_parallel(
507        vertices: &[VertexId],
508        adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
509    ) -> Option<(VertexId, usize)> {
510        if vertices.len() < 100 {
511            return Self::min_degree_vertex_sequential(vertices, adjacency);
512        }
513
514        vertices
515            .par_iter()
516            .map(|&v| {
517                let degree = adjacency.get(&v).map(|n| n.len()).unwrap_or(0);
518                (v, degree)
519            })
520            .filter(|(_, d)| *d > 0)
521            .min_by_key(|(_, d)| *d)
522    }
523
524    /// Find minimum degree vertex sequentially
525    #[cfg(not(feature = "rayon"))]
526    pub fn min_degree_vertex_parallel(
527        vertices: &[VertexId],
528        adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
529    ) -> Option<(VertexId, usize)> {
530        Self::min_degree_vertex_sequential(vertices, adjacency)
531    }
532
533    /// Sequential minimum degree
534    pub fn min_degree_vertex_sequential(
535        vertices: &[VertexId],
536        adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
537    ) -> Option<(VertexId, usize)> {
538        vertices
539            .iter()
540            .map(|&v| {
541                let degree = adjacency.get(&v).map(|n| n.len()).unwrap_or(0);
542                (v, degree)
543            })
544            .filter(|(_, d)| *d > 0)
545            .min_by_key(|(_, d)| *d)
546    }
547}
548
549#[cfg(test)]
550mod tests {
551    use super::*;
552
553    #[test]
554    fn test_work_item_submission() {
555        let scheduler = WorkStealingScheduler::new();
556
557        scheduler.submit(WorkItem {
558            level: 0,
559            vertices: vec![1, 2, 3],
560            priority: 1,
561            estimated_work: 100,
562        });
563
564        scheduler.submit(WorkItem {
565            level: 1,
566            vertices: vec![4, 5, 6],
567            priority: 0, // Higher priority
568            estimated_work: 50,
569        });
570
571        assert_eq!(scheduler.queue_size(), 2);
572
573        // Should steal highest priority first
574        let stolen = scheduler.steal().unwrap();
575        assert_eq!(stolen.level, 1); // Priority 0 comes first
576    }
577
578    #[test]
579    fn test_parallel_updater_min() {
580        let updater = ParallelLevelUpdater::new();
581
582        assert!(updater.global_min().is_infinite());
583
584        assert!(updater.try_update_min(10.0, 0));
585        assert_eq!(updater.global_min(), 10.0);
586        assert_eq!(updater.best_level(), Some(0));
587
588        assert!(updater.try_update_min(5.0, 1));
589        assert_eq!(updater.global_min(), 5.0);
590        assert_eq!(updater.best_level(), Some(1));
591
592        // Should not update with higher value
593        assert!(!updater.try_update_min(7.0, 2));
594        assert_eq!(updater.global_min(), 5.0);
595    }
596
597    #[test]
598    fn test_process_parallel() {
599        let updater = ParallelLevelUpdater::new();
600
601        let levels = vec![0, 1, 2, 3, 4];
602
603        let results = updater.process_parallel(&levels, |level| LevelUpdateResult {
604            level,
605            cut_value: level as f64 * 2.0,
606            partition: HashSet::new(),
607            time_us: 0,
608        });
609
610        assert_eq!(results.len(), 5);
611        assert_eq!(updater.global_min(), 0.0);
612        assert_eq!(updater.best_level(), Some(0));
613    }
614
615    #[test]
616    fn test_boundary_size() {
617        let partition: HashSet<_> = vec![1, 2].into_iter().collect();
618
619        let mut adjacency: HashMap<VertexId, Vec<(VertexId, f64)>> = HashMap::new();
620        adjacency.insert(1, vec![(2, 1.0), (3, 2.0)]);
621        adjacency.insert(2, vec![(1, 1.0), (4, 3.0)]);
622        adjacency.insert(3, vec![(1, 2.0)]);
623        adjacency.insert(4, vec![(2, 3.0)]);
624
625        let boundary = ParallelCutOps::boundary_size_sequential(&partition, &adjacency);
626
627        // Edges crossing: 1-3 (2.0) + 2-4 (3.0) = 5.0
628        assert_eq!(boundary, 5.0);
629    }
630
631    #[test]
632    fn test_min_degree_vertex() {
633        let vertices: Vec<_> = vec![1, 2, 3, 4];
634
635        let mut adjacency: HashMap<VertexId, Vec<(VertexId, f64)>> = HashMap::new();
636        adjacency.insert(1, vec![(2, 1.0), (3, 1.0), (4, 1.0)]);
637        adjacency.insert(2, vec![(1, 1.0)]);
638        adjacency.insert(3, vec![(1, 1.0), (4, 1.0)]);
639        adjacency.insert(4, vec![(1, 1.0), (3, 1.0)]);
640
641        let (min_v, min_deg) =
642            ParallelCutOps::min_degree_vertex_sequential(&vertices, &adjacency).unwrap();
643
644        assert_eq!(min_v, 2);
645        assert_eq!(min_deg, 1);
646    }
647
648    #[test]
649    fn test_scheduler_steal_count() {
650        let scheduler = WorkStealingScheduler::new();
651
652        scheduler.submit(WorkItem {
653            level: 0,
654            vertices: vec![1],
655            priority: 0,
656            estimated_work: 10,
657        });
658
659        assert_eq!(scheduler.steal_count(), 0);
660        let _ = scheduler.steal();
661        assert_eq!(scheduler.steal_count(), 1);
662    }
663
664    #[test]
665    fn test_batch_submit() {
666        let scheduler = WorkStealingScheduler::new();
667
668        let items = vec![
669            WorkItem {
670                level: 0,
671                vertices: vec![],
672                priority: 2,
673                estimated_work: 100,
674            },
675            WorkItem {
676                level: 1,
677                vertices: vec![],
678                priority: 0,
679                estimated_work: 50,
680            },
681            WorkItem {
682                level: 2,
683                vertices: vec![],
684                priority: 1,
685                estimated_work: 75,
686            },
687        ];
688
689        scheduler.submit_batch(items);
690
691        assert_eq!(scheduler.queue_size(), 3);
692
693        // Should be sorted by priority
694        let first = scheduler.steal().unwrap();
695        assert_eq!(first.level, 1); // Priority 0
696    }
697}