1use crate::graph::VertexId;
12use std::collections::{HashMap, HashSet};
13use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex, RwLock};
15
16#[cfg(feature = "rayon")]
17use rayon::prelude::*;
18
19#[derive(Debug, Clone)]
21pub struct ParallelConfig {
22 pub min_parallel_size: usize,
24 pub num_threads: usize,
26 pub work_stealing: bool,
28 pub chunk_size: usize,
30 pub adaptive: bool,
32}
33
34impl Default for ParallelConfig {
35 fn default() -> Self {
36 Self {
37 min_parallel_size: 100,
38 num_threads: 0, work_stealing: true,
40 chunk_size: 64,
41 adaptive: true,
42 }
43 }
44}
45
46#[derive(Debug, Clone)]
48pub struct WorkItem {
49 pub level: usize,
51 pub vertices: Vec<VertexId>,
53 pub priority: u32,
55 pub estimated_work: usize,
57}
58
59#[derive(Debug, Clone)]
61pub struct LevelUpdateResult {
62 pub level: usize,
64 pub cut_value: f64,
66 pub partition: HashSet<VertexId>,
68 pub time_us: u64,
70}
71
72pub struct WorkStealingScheduler {
74 config: ParallelConfig,
75 work_queue: RwLock<Vec<WorkItem>>,
77 results: RwLock<HashMap<usize, LevelUpdateResult>>,
79 active_workers: AtomicUsize,
81 total_work: AtomicU64,
83 steals: AtomicU64,
85}
86
87impl WorkStealingScheduler {
88 pub fn new() -> Self {
90 Self::with_config(ParallelConfig::default())
91 }
92
93 pub fn with_config(config: ParallelConfig) -> Self {
95 Self {
96 config,
97 work_queue: RwLock::new(Vec::new()),
98 results: RwLock::new(HashMap::new()),
99 active_workers: AtomicUsize::new(0),
100 total_work: AtomicU64::new(0),
101 steals: AtomicU64::new(0),
102 }
103 }
104
105 pub fn submit(&self, item: WorkItem) {
107 let mut queue = self.work_queue.write().unwrap();
108 let estimated_work = item.estimated_work;
109 queue.push(item);
110
111 queue.sort_by_key(|w| w.priority);
113
114 self.total_work
115 .fetch_add(estimated_work as u64, Ordering::Relaxed);
116 }
117
118 pub fn submit_batch(&self, items: Vec<WorkItem>) {
120 let mut queue = self.work_queue.write().unwrap();
121
122 for item in items {
123 self.total_work
124 .fetch_add(item.estimated_work as u64, Ordering::Relaxed);
125 queue.push(item);
126 }
127
128 queue.sort_by_key(|w| w.priority);
130 }
131
132 pub fn steal(&self) -> Option<WorkItem> {
134 let mut queue = self.work_queue.write().unwrap();
135
136 if queue.is_empty() {
137 return None;
138 }
139
140 self.steals.fetch_add(1, Ordering::Relaxed);
141
142 Some(queue.remove(0))
144 }
145
146 pub fn complete(&self, result: LevelUpdateResult) {
148 let mut results = self.results.write().unwrap();
149 results.insert(result.level, result);
150 }
151
152 pub fn get_results(&self) -> HashMap<usize, LevelUpdateResult> {
154 self.results.read().unwrap().clone()
155 }
156
157 pub fn clear_results(&self) {
159 self.results.write().unwrap().clear();
160 }
161
162 pub fn is_empty(&self) -> bool {
164 self.work_queue.read().unwrap().is_empty()
165 }
166
167 pub fn queue_size(&self) -> usize {
169 self.work_queue.read().unwrap().len()
170 }
171
172 pub fn steal_count(&self) -> u64 {
174 self.steals.load(Ordering::Relaxed)
175 }
176}
177
178impl Default for WorkStealingScheduler {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184pub struct ParallelLevelUpdater {
186 config: ParallelConfig,
187 scheduler: Arc<WorkStealingScheduler>,
189 global_min: AtomicU64,
191 best_level: AtomicUsize,
193}
194
195impl ParallelLevelUpdater {
196 pub fn new() -> Self {
198 Self::with_config(ParallelConfig::default())
199 }
200
201 pub fn with_config(config: ParallelConfig) -> Self {
203 Self {
204 scheduler: Arc::new(WorkStealingScheduler::with_config(config.clone())),
205 config,
206 global_min: AtomicU64::new(f64::INFINITY.to_bits()),
207 best_level: AtomicUsize::new(usize::MAX),
208 }
209 }
210
211 pub fn try_update_min(&self, value: f64, level: usize) -> bool {
213 let value_bits = value.to_bits();
214 let mut current = self.global_min.load(Ordering::Acquire);
215
216 loop {
217 let current_value = f64::from_bits(current);
218 if value >= current_value {
219 return false;
220 }
221
222 match self.global_min.compare_exchange_weak(
223 current,
224 value_bits,
225 Ordering::AcqRel,
226 Ordering::Acquire,
227 ) {
228 Ok(_) => {
229 self.best_level.store(level, Ordering::Release);
230 return true;
231 }
232 Err(c) => current = c,
233 }
234 }
235 }
236
237 pub fn global_min(&self) -> f64 {
239 f64::from_bits(self.global_min.load(Ordering::Acquire))
240 }
241
242 pub fn best_level(&self) -> Option<usize> {
244 let level = self.best_level.load(Ordering::Acquire);
245 if level == usize::MAX {
246 None
247 } else {
248 Some(level)
249 }
250 }
251
252 pub fn reset_min(&self) {
254 self.global_min
255 .store(f64::INFINITY.to_bits(), Ordering::Release);
256 self.best_level.store(usize::MAX, Ordering::Release);
257 }
258
259 #[cfg(feature = "rayon")]
261 pub fn process_parallel<F>(&self, levels: &[usize], mut process_fn: F) -> Vec<LevelUpdateResult>
262 where
263 F: FnMut(usize) -> LevelUpdateResult + Send + Sync + Clone,
264 {
265 let size = levels.len();
266
267 if size < self.config.min_parallel_size {
268 return levels
270 .iter()
271 .map(|&level| {
272 let result = process_fn.clone()(level);
273 self.try_update_min(result.cut_value, level);
274 result
275 })
276 .collect();
277 }
278
279 levels
281 .par_iter()
282 .map(|&level| {
283 let result = process_fn.clone()(level);
284 self.try_update_min(result.cut_value, level);
285 result
286 })
287 .collect()
288 }
289
290 #[cfg(not(feature = "rayon"))]
292 pub fn process_parallel<F>(&self, levels: &[usize], mut process_fn: F) -> Vec<LevelUpdateResult>
293 where
294 F: FnMut(usize) -> LevelUpdateResult + Clone,
295 {
296 levels
297 .iter()
298 .map(|&level| {
299 let result = process_fn.clone()(level);
300 self.try_update_min(result.cut_value, level);
301 result
302 })
303 .collect()
304 }
305
306 #[cfg(feature = "rayon")]
308 pub fn process_with_stealing<F>(
309 &self,
310 work_items: Vec<WorkItem>,
311 process_fn: F,
312 ) -> Vec<LevelUpdateResult>
313 where
314 F: Fn(&WorkItem) -> LevelUpdateResult + Send + Sync,
315 {
316 if work_items.len() < self.config.min_parallel_size {
317 return work_items
319 .iter()
320 .map(|item| {
321 let result = process_fn(item);
322 self.try_update_min(result.cut_value, item.level);
323 result
324 })
325 .collect();
326 }
327
328 work_items
330 .par_iter()
331 .map(|item| {
332 let result = process_fn(item);
333 self.try_update_min(result.cut_value, item.level);
334 result
335 })
336 .collect()
337 }
338
339 #[cfg(not(feature = "rayon"))]
341 pub fn process_with_stealing<F>(
342 &self,
343 work_items: Vec<WorkItem>,
344 process_fn: F,
345 ) -> Vec<LevelUpdateResult>
346 where
347 F: Fn(&WorkItem) -> LevelUpdateResult,
348 {
349 work_items
350 .iter()
351 .map(|item| {
352 let result = process_fn(item);
353 self.try_update_min(result.cut_value, item.level);
354 result
355 })
356 .collect()
357 }
358
359 #[cfg(feature = "rayon")]
361 pub fn process_vertices_parallel<F, R>(&self, vertices: &[VertexId], process_fn: F) -> Vec<R>
362 where
363 F: Fn(VertexId) -> R + Send + Sync,
364 R: Send,
365 {
366 if vertices.len() < self.config.min_parallel_size {
367 return vertices.iter().map(|&v| process_fn(v)).collect();
368 }
369
370 vertices.par_iter().map(|&v| process_fn(v)).collect()
371 }
372
373 #[cfg(not(feature = "rayon"))]
375 pub fn process_vertices_parallel<F, R>(&self, vertices: &[VertexId], process_fn: F) -> Vec<R>
376 where
377 F: Fn(VertexId) -> R,
378 {
379 vertices.iter().map(|&v| process_fn(v)).collect()
380 }
381
382 #[cfg(feature = "rayon")]
384 pub fn parallel_reduce<T, F, R>(
385 &self,
386 items: &[T],
387 identity: R,
388 map_fn: F,
389 reduce_fn: fn(R, R) -> R,
390 ) -> R
391 where
392 T: Sync,
393 F: Fn(&T) -> R + Send + Sync,
394 R: Send + Clone,
395 {
396 if items.len() < self.config.min_parallel_size {
397 return items
398 .iter()
399 .map(|item| map_fn(item))
400 .fold(identity.clone(), reduce_fn);
401 }
402
403 items
404 .par_iter()
405 .map(|item| map_fn(item))
406 .reduce(|| identity.clone(), reduce_fn)
407 }
408
409 #[cfg(not(feature = "rayon"))]
411 pub fn parallel_reduce<T, F, R>(
412 &self,
413 items: &[T],
414 identity: R,
415 map_fn: F,
416 reduce_fn: fn(R, R) -> R,
417 ) -> R
418 where
419 F: Fn(&T) -> R,
420 R: Clone,
421 {
422 items
423 .iter()
424 .map(|item| map_fn(item))
425 .fold(identity, reduce_fn)
426 }
427
428 pub fn scheduler(&self) -> &Arc<WorkStealingScheduler> {
430 &self.scheduler
431 }
432}
433
434impl Default for ParallelLevelUpdater {
435 fn default() -> Self {
436 Self::new()
437 }
438}
439
440pub struct ParallelCutOps;
442
443impl ParallelCutOps {
444 #[cfg(feature = "rayon")]
446 pub fn boundary_size_parallel(
447 partition: &HashSet<VertexId>,
448 adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
449 ) -> f64 {
450 let partition_vec: Vec<_> = partition.iter().copied().collect();
451
452 if partition_vec.len() < 100 {
453 return Self::boundary_size_sequential(partition, adjacency);
454 }
455
456 partition_vec
457 .par_iter()
458 .map(|&v| {
459 adjacency
460 .get(&v)
461 .map(|neighbors| {
462 neighbors
463 .iter()
464 .filter(|(n, _)| !partition.contains(n))
465 .map(|(_, w)| w)
466 .sum::<f64>()
467 })
468 .unwrap_or(0.0)
469 })
470 .sum()
471 }
472
473 #[cfg(not(feature = "rayon"))]
475 pub fn boundary_size_parallel(
476 partition: &HashSet<VertexId>,
477 adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
478 ) -> f64 {
479 Self::boundary_size_sequential(partition, adjacency)
480 }
481
482 pub fn boundary_size_sequential(
484 partition: &HashSet<VertexId>,
485 adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
486 ) -> f64 {
487 partition
488 .iter()
489 .map(|&v| {
490 adjacency
491 .get(&v)
492 .map(|neighbors| {
493 neighbors
494 .iter()
495 .filter(|(n, _)| !partition.contains(n))
496 .map(|(_, w)| w)
497 .sum::<f64>()
498 })
499 .unwrap_or(0.0)
500 })
501 .sum()
502 }
503
504 #[cfg(feature = "rayon")]
506 pub fn min_degree_vertex_parallel(
507 vertices: &[VertexId],
508 adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
509 ) -> Option<(VertexId, usize)> {
510 if vertices.len() < 100 {
511 return Self::min_degree_vertex_sequential(vertices, adjacency);
512 }
513
514 vertices
515 .par_iter()
516 .map(|&v| {
517 let degree = adjacency.get(&v).map(|n| n.len()).unwrap_or(0);
518 (v, degree)
519 })
520 .filter(|(_, d)| *d > 0)
521 .min_by_key(|(_, d)| *d)
522 }
523
524 #[cfg(not(feature = "rayon"))]
526 pub fn min_degree_vertex_parallel(
527 vertices: &[VertexId],
528 adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
529 ) -> Option<(VertexId, usize)> {
530 Self::min_degree_vertex_sequential(vertices, adjacency)
531 }
532
533 pub fn min_degree_vertex_sequential(
535 vertices: &[VertexId],
536 adjacency: &HashMap<VertexId, Vec<(VertexId, f64)>>,
537 ) -> Option<(VertexId, usize)> {
538 vertices
539 .iter()
540 .map(|&v| {
541 let degree = adjacency.get(&v).map(|n| n.len()).unwrap_or(0);
542 (v, degree)
543 })
544 .filter(|(_, d)| *d > 0)
545 .min_by_key(|(_, d)| *d)
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use super::*;
552
553 #[test]
554 fn test_work_item_submission() {
555 let scheduler = WorkStealingScheduler::new();
556
557 scheduler.submit(WorkItem {
558 level: 0,
559 vertices: vec![1, 2, 3],
560 priority: 1,
561 estimated_work: 100,
562 });
563
564 scheduler.submit(WorkItem {
565 level: 1,
566 vertices: vec![4, 5, 6],
567 priority: 0, estimated_work: 50,
569 });
570
571 assert_eq!(scheduler.queue_size(), 2);
572
573 let stolen = scheduler.steal().unwrap();
575 assert_eq!(stolen.level, 1); }
577
578 #[test]
579 fn test_parallel_updater_min() {
580 let updater = ParallelLevelUpdater::new();
581
582 assert!(updater.global_min().is_infinite());
583
584 assert!(updater.try_update_min(10.0, 0));
585 assert_eq!(updater.global_min(), 10.0);
586 assert_eq!(updater.best_level(), Some(0));
587
588 assert!(updater.try_update_min(5.0, 1));
589 assert_eq!(updater.global_min(), 5.0);
590 assert_eq!(updater.best_level(), Some(1));
591
592 assert!(!updater.try_update_min(7.0, 2));
594 assert_eq!(updater.global_min(), 5.0);
595 }
596
597 #[test]
598 fn test_process_parallel() {
599 let updater = ParallelLevelUpdater::new();
600
601 let levels = vec![0, 1, 2, 3, 4];
602
603 let results = updater.process_parallel(&levels, |level| LevelUpdateResult {
604 level,
605 cut_value: level as f64 * 2.0,
606 partition: HashSet::new(),
607 time_us: 0,
608 });
609
610 assert_eq!(results.len(), 5);
611 assert_eq!(updater.global_min(), 0.0);
612 assert_eq!(updater.best_level(), Some(0));
613 }
614
615 #[test]
616 fn test_boundary_size() {
617 let partition: HashSet<_> = vec![1, 2].into_iter().collect();
618
619 let mut adjacency: HashMap<VertexId, Vec<(VertexId, f64)>> = HashMap::new();
620 adjacency.insert(1, vec![(2, 1.0), (3, 2.0)]);
621 adjacency.insert(2, vec![(1, 1.0), (4, 3.0)]);
622 adjacency.insert(3, vec![(1, 2.0)]);
623 adjacency.insert(4, vec![(2, 3.0)]);
624
625 let boundary = ParallelCutOps::boundary_size_sequential(&partition, &adjacency);
626
627 assert_eq!(boundary, 5.0);
629 }
630
631 #[test]
632 fn test_min_degree_vertex() {
633 let vertices: Vec<_> = vec![1, 2, 3, 4];
634
635 let mut adjacency: HashMap<VertexId, Vec<(VertexId, f64)>> = HashMap::new();
636 adjacency.insert(1, vec![(2, 1.0), (3, 1.0), (4, 1.0)]);
637 adjacency.insert(2, vec![(1, 1.0)]);
638 adjacency.insert(3, vec![(1, 1.0), (4, 1.0)]);
639 adjacency.insert(4, vec![(1, 1.0), (3, 1.0)]);
640
641 let (min_v, min_deg) =
642 ParallelCutOps::min_degree_vertex_sequential(&vertices, &adjacency).unwrap();
643
644 assert_eq!(min_v, 2);
645 assert_eq!(min_deg, 1);
646 }
647
648 #[test]
649 fn test_scheduler_steal_count() {
650 let scheduler = WorkStealingScheduler::new();
651
652 scheduler.submit(WorkItem {
653 level: 0,
654 vertices: vec![1],
655 priority: 0,
656 estimated_work: 10,
657 });
658
659 assert_eq!(scheduler.steal_count(), 0);
660 let _ = scheduler.steal();
661 assert_eq!(scheduler.steal_count(), 1);
662 }
663
664 #[test]
665 fn test_batch_submit() {
666 let scheduler = WorkStealingScheduler::new();
667
668 let items = vec![
669 WorkItem {
670 level: 0,
671 vertices: vec![],
672 priority: 2,
673 estimated_work: 100,
674 },
675 WorkItem {
676 level: 1,
677 vertices: vec![],
678 priority: 0,
679 estimated_work: 50,
680 },
681 WorkItem {
682 level: 2,
683 vertices: vec![],
684 priority: 1,
685 estimated_work: 75,
686 },
687 ];
688
689 scheduler.submit_batch(items);
690
691 assert_eq!(scheduler.queue_size(), 3);
692
693 let first = scheduler.steal().unwrap();
695 assert_eq!(first.level, 1); }
697}