ruvector_mincut/parallel/
mod.rs1#![allow(missing_docs)]
7
8use crate::compact::{
9 CompactCoreState, CompactVertexId, CompactEdge,
10 CompactWitness, BitSet256, CoreResult, MAX_EDGES_PER_CORE,
11};
12use core::sync::atomic::{AtomicU8, AtomicU16, Ordering};
13
14#[cfg(feature = "wasm")]
16use crate::wasm::simd::{simd_boundary_size, simd_popcount};
17
18#[cfg(not(feature = "wasm"))]
19#[inline]
20fn simd_popcount(bits: &[u64; 4]) -> u32 {
21 bits.iter().map(|b| b.count_ones()).sum()
22}
23
24#[cfg(not(feature = "wasm"))]
25#[inline]
26fn simd_boundary_size(set_a: &BitSet256, edges: &[(CompactVertexId, CompactVertexId)]) -> u16 {
27 let mut count = 0u16;
28 for &(src, tgt) in edges {
29 let src_in = set_a.contains(src);
30 let tgt_in = set_a.contains(tgt);
31 if src_in != tgt_in {
32 count += 1;
33 }
34 }
35 count
36}
37
38pub const NUM_CORES: usize = 256;
40
41pub const RANGES_PER_CORE: usize = 1;
43
44pub const TOTAL_RANGES: usize = NUM_CORES * RANGES_PER_CORE;
46
47pub const RANGE_FACTOR: f32 = 1.2;
49
50#[derive(Clone, Copy, Debug, PartialEq, Eq)]
52#[repr(u8)]
53pub enum CoreStrategy {
54 GeometricRanges = 0,
56 GraphPartition = 1,
58 WorkStealing = 2,
60}
61
62#[derive(Clone, Copy)]
64#[repr(C)]
65pub struct CoreMessage {
66 pub msg_type: u8,
67 pub src_core: u8,
68 pub payload: u16,
69}
70
71impl CoreMessage {
72 pub const TYPE_IDLE: u8 = 0;
73 pub const TYPE_WORK_REQUEST: u8 = 1;
74 pub const TYPE_WORK_AVAILABLE: u8 = 2;
75 pub const TYPE_RESULT: u8 = 3;
76 pub const TYPE_SYNC: u8 = 4;
77 pub const TYPE_STEAL_REQUEST: u8 = 5;
78 pub const TYPE_STEAL_RESPONSE: u8 = 6;
79}
80
81#[derive(Clone, Copy, Default)]
83#[repr(C)]
84pub struct WorkItem {
85 pub range_idx: u16,
87 pub priority: u8,
89 pub status: u8,
91}
92
93impl WorkItem {
94 pub const STATUS_PENDING: u8 = 0;
95 pub const STATUS_IN_PROGRESS: u8 = 1;
96 pub const STATUS_COMPLETE: u8 = 2;
97}
98
99#[repr(C, align(64))]
101pub struct SharedCoordinator {
102 pub global_min_cut: AtomicU16,
104 pub completed_cores: AtomicU16,
106 pub phase: AtomicU8,
108 pub queue_head: AtomicU16,
110 pub queue_tail: AtomicU16,
112 pub best_core: AtomicU8,
114 _pad: [u8; 52],
116}
117
118impl SharedCoordinator {
119 pub const PHASE_INIT: u8 = 0;
120 pub const PHASE_DISTRIBUTE: u8 = 1;
121 pub const PHASE_COMPUTE: u8 = 2;
122 pub const PHASE_COLLECT: u8 = 3;
123 pub const PHASE_DONE: u8 = 4;
124
125 pub fn new() -> Self {
126 Self {
127 global_min_cut: AtomicU16::new(u16::MAX),
128 completed_cores: AtomicU16::new(0),
129 phase: AtomicU8::new(Self::PHASE_INIT),
130 queue_head: AtomicU16::new(0),
131 queue_tail: AtomicU16::new(0),
132 best_core: AtomicU8::new(0),
133 _pad: [0; 52],
134 }
135 }
136
137 pub fn try_update_min(&self, new_min: u16, core_id: u8) -> bool {
139 let mut current = self.global_min_cut.load(Ordering::Acquire);
140 loop {
141 if new_min >= current {
142 return false;
143 }
144 match self.global_min_cut.compare_exchange_weak(
145 current,
146 new_min,
147 Ordering::AcqRel,
148 Ordering::Acquire,
149 ) {
150 Ok(_) => {
151 self.best_core.store(core_id, Ordering::Release);
152 return true;
153 }
154 Err(c) => current = c,
155 }
156 }
157 }
158
159 pub fn mark_completed(&self) -> u16 {
161 self.completed_cores.fetch_add(1, Ordering::AcqRel) + 1
162 }
163
164 pub fn all_completed(&self) -> bool {
166 self.completed_cores.load(Ordering::Acquire) >= NUM_CORES as u16
167 }
168}
169
170#[inline]
172pub fn compute_core_range(core_id: u8) -> (u16, u16) {
173 let i = core_id as u32;
174 let lambda_min = (RANGE_FACTOR.powi(i as i32)).floor() as u16;
175 let lambda_max = (RANGE_FACTOR.powi((i + 1) as i32)).floor() as u16;
176 (lambda_min.max(1), lambda_max.max(1))
177}
178
179pub struct CoreDistributor {
181 pub strategy: CoreStrategy,
182 pub num_vertices: u16,
183 pub num_edges: u16,
184}
185
186impl CoreDistributor {
187 pub fn new(strategy: CoreStrategy, num_vertices: u16, num_edges: u16) -> Self {
188 Self { strategy, num_vertices, num_edges }
189 }
190
191 #[inline]
193 pub fn vertex_to_core(&self, v: CompactVertexId) -> u8 {
194 match self.strategy {
195 CoreStrategy::GeometricRanges => {
196 0
198 }
199 CoreStrategy::GraphPartition => {
200 ((v as u32 * NUM_CORES as u32) / self.num_vertices as u32) as u8
202 }
203 CoreStrategy::WorkStealing => {
204 0
206 }
207 }
208 }
209
210 pub fn core_vertex_range(&self, core_id: u8) -> (CompactVertexId, CompactVertexId) {
212 match self.strategy {
213 CoreStrategy::GeometricRanges => {
214 (0, self.num_vertices)
215 }
216 CoreStrategy::GraphPartition => {
217 let n = self.num_vertices as u32;
218 let start = (core_id as u32 * n) / NUM_CORES as u32;
219 let end = ((core_id as u32 + 1) * n) / NUM_CORES as u32;
220 (start as u16, end as u16)
221 }
222 CoreStrategy::WorkStealing => {
223 (0, self.num_vertices)
224 }
225 }
226 }
227}
228
229pub struct CoreExecutor<'a> {
231 pub core_id: u8,
233 pub state: CompactCoreState,
235 pub coordinator: Option<&'a SharedCoordinator>,
237}
238
239impl<'a> CoreExecutor<'a> {
240 pub fn init(core_id: u8, coordinator: Option<&'a SharedCoordinator>) -> Self {
242 let (lambda_min, lambda_max) = compute_core_range(core_id);
243
244 let state = CompactCoreState {
245 adjacency: Default::default(),
246 edges: [CompactEdge::default(); MAX_EDGES_PER_CORE],
247 num_vertices: 0,
248 num_edges: 0,
249 min_cut: u16::MAX,
250 best_witness: CompactWitness::default(),
251 lambda_min,
252 lambda_max,
253 core_id,
254 status: CompactCoreState::STATUS_IDLE,
255 };
256
257 Self {
258 core_id,
259 state,
260 coordinator,
261 }
262 }
263
264 pub fn add_edge(&mut self, src: CompactVertexId, tgt: CompactVertexId, weight: u16) {
266 if self.state.num_edges as usize >= 512 {
267 return; }
269
270 let idx = self.state.num_edges as usize;
271 self.state.edges[idx] = CompactEdge {
272 source: src,
273 target: tgt,
274 weight,
275 flags: CompactEdge::FLAG_ACTIVE,
276 };
277 self.state.num_edges += 1;
278
279 self.state.num_vertices = self.state.num_vertices
281 .max(src + 1)
282 .max(tgt + 1);
283 }
284
285 pub fn process(&mut self) -> CoreResult {
287 self.state.status = CompactCoreState::STATUS_PROCESSING;
288
289 let mut min_degree = u16::MAX;
292 let mut min_vertex = 0u16;
293
294 for v in 0..self.state.num_vertices {
295 let degree = self.compute_degree(v);
296 if degree > 0 && degree < min_degree {
297 min_degree = degree;
298 min_vertex = v;
299 }
300 }
301
302 if min_degree >= self.state.lambda_min && min_degree <= self.state.lambda_max {
304 self.state.min_cut = min_degree;
305
306 let mut membership = BitSet256::new();
308 membership.insert(min_vertex);
309 self.state.best_witness = CompactWitness::new(min_vertex, membership, min_degree);
310
311 if let Some(coord) = self.coordinator {
313 coord.try_update_min(min_degree, self.core_id);
314 }
315 }
316
317 self.state.status = CompactCoreState::STATUS_DONE;
318
319 if let Some(coord) = self.coordinator {
321 coord.mark_completed();
322 }
323
324 CoreResult {
325 core_id: self.core_id,
326 status: self.state.status,
327 min_cut: self.state.min_cut,
328 witness_hash: self.state.best_witness.hash,
329 witness_seed: self.state.best_witness.seed,
330 witness_cardinality: self.state.best_witness.cardinality,
331 witness_boundary: self.state.best_witness.boundary_size,
332 padding: [0; 4],
333 }
334 }
335
336 fn compute_degree(&self, v: CompactVertexId) -> u16 {
338 let mut degree = 0u16;
339 for i in 0..self.state.num_edges as usize {
340 let edge = &self.state.edges[i];
341 if edge.is_active() && (edge.source == v || edge.target == v) {
342 degree = degree.saturating_add(edge.weight);
344 }
345 }
346 degree
347 }
348
349 #[inline]
353 pub fn compute_boundary_simd(&self, set: &BitSet256) -> u16 {
354 let edges: Vec<(CompactVertexId, CompactVertexId)> = self.state.edges
356 [..self.state.num_edges as usize]
357 .iter()
358 .filter(|e| e.is_active())
359 .map(|e| (e.source, e.target))
360 .collect();
361
362 simd_boundary_size(set, &edges)
364 }
365
366 #[inline]
368 pub fn membership_count_simd(&self, set: &BitSet256) -> u32 {
369 simd_popcount(&set.bits)
370 }
371}
372
373pub struct ResultAggregator {
375 pub results: [CoreResult; NUM_CORES],
377 pub best_idx: usize,
379 pub global_min: u16,
381}
382
383impl ResultAggregator {
384 pub fn new() -> Self {
386 Self {
387 results: [CoreResult::default(); NUM_CORES],
388 best_idx: 0,
389 global_min: u16::MAX,
390 }
391 }
392
393 pub fn add_result(&mut self, result: CoreResult) {
395 let idx = result.core_id as usize;
396 self.results[idx] = result;
397
398 if result.min_cut < self.global_min {
399 self.global_min = result.min_cut;
400 self.best_idx = idx;
401 }
402 }
403
404 pub fn best_result(&self) -> &CoreResult {
406 &self.results[self.best_idx]
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 #[test]
415 fn test_compute_core_range() {
416 let (min0, max0) = compute_core_range(0);
417 assert_eq!(min0, 1);
418 assert_eq!(max0, 1);
419
420 let (min10, max10) = compute_core_range(10);
421 assert_eq!(min10, 6);
422 assert_eq!(max10, 7);
423 }
424
425 #[test]
426 fn test_shared_coordinator() {
427 let coord = SharedCoordinator::new();
428
429 assert!(coord.try_update_min(100, 0));
430 assert_eq!(coord.global_min_cut.load(Ordering::Acquire), 100);
431
432 assert!(coord.try_update_min(50, 1));
433 assert_eq!(coord.global_min_cut.load(Ordering::Acquire), 50);
434
435 assert!(!coord.try_update_min(60, 2)); assert_eq!(coord.global_min_cut.load(Ordering::Acquire), 50);
437 }
438
439 #[test]
440 fn test_core_executor() {
441 let coord = SharedCoordinator::new();
442 let mut exec = CoreExecutor::init(0, Some(&coord));
443
444 exec.add_edge(0, 1, 1);
445 exec.add_edge(1, 2, 1);
446
447 let result = exec.process();
448 assert_eq!(result.core_id, 0);
449 }
450
451 #[test]
452 fn test_result_aggregator() {
453 let mut agg = ResultAggregator::new();
454
455 agg.add_result(CoreResult {
456 core_id: 0,
457 min_cut: 100,
458 ..Default::default()
459 });
460
461 agg.add_result(CoreResult {
462 core_id: 1,
463 min_cut: 50,
464 ..Default::default()
465 });
466
467 assert_eq!(agg.global_min, 50);
468 assert_eq!(agg.best_idx, 1);
469 }
470}