ruvector_mincut/parallel/
mod.rs

1//! Parallel distribution for 256-core agentic chip
2//!
3//! Distributes minimum cut computation across WASM cores.
4
5// Internal optimization module - docs on public API in lib.rs
6#![allow(missing_docs)]
7
8use crate::compact::{
9    CompactCoreState, CompactVertexId, CompactEdge,
10    CompactWitness, BitSet256, CoreResult, MAX_EDGES_PER_CORE,
11};
12use core::sync::atomic::{AtomicU8, AtomicU16, Ordering};
13
14// SIMD functions (inlined for non-wasm, uses wasm::simd when available)
15#[cfg(feature = "wasm")]
16use crate::wasm::simd::{simd_boundary_size, simd_popcount};
17
18#[cfg(not(feature = "wasm"))]
19#[inline]
20fn simd_popcount(bits: &[u64; 4]) -> u32 {
21    bits.iter().map(|b| b.count_ones()).sum()
22}
23
24#[cfg(not(feature = "wasm"))]
25#[inline]
26fn simd_boundary_size(set_a: &BitSet256, edges: &[(CompactVertexId, CompactVertexId)]) -> u16 {
27    let mut count = 0u16;
28    for &(src, tgt) in edges {
29        let src_in = set_a.contains(src);
30        let tgt_in = set_a.contains(tgt);
31        if src_in != tgt_in {
32            count += 1;
33        }
34    }
35    count
36}
37
38/// Number of WASM cores
39pub const NUM_CORES: usize = 256;
40
41/// Number of geometric ranges per core
42pub const RANGES_PER_CORE: usize = 1;
43
44/// Total ranges = NUM_CORES × RANGES_PER_CORE
45pub const TOTAL_RANGES: usize = NUM_CORES * RANGES_PER_CORE;
46
47/// Range factor (1.2 from paper)
48pub const RANGE_FACTOR: f32 = 1.2;
49
50/// Core assignment strategy
51#[derive(Clone, Copy, Debug, PartialEq, Eq)]
52#[repr(u8)]
53pub enum CoreStrategy {
54    /// Each core handles one geometric range [1.2^i, 1.2^(i+1)]
55    GeometricRanges = 0,
56    /// Cores handle graph partitions (for very large graphs)
57    GraphPartition = 1,
58    /// Work stealing with dynamic assignment
59    WorkStealing = 2,
60}
61
62/// Message types for inter-core communication (4 bytes)
63#[derive(Clone, Copy)]
64#[repr(C)]
65pub struct CoreMessage {
66    pub msg_type: u8,
67    pub src_core: u8,
68    pub payload: u16,
69}
70
71impl CoreMessage {
72    pub const TYPE_IDLE: u8 = 0;
73    pub const TYPE_WORK_REQUEST: u8 = 1;
74    pub const TYPE_WORK_AVAILABLE: u8 = 2;
75    pub const TYPE_RESULT: u8 = 3;
76    pub const TYPE_SYNC: u8 = 4;
77    pub const TYPE_STEAL_REQUEST: u8 = 5;
78    pub const TYPE_STEAL_RESPONSE: u8 = 6;
79}
80
81/// Lock-free work queue entry
82#[derive(Clone, Copy, Default)]
83#[repr(C)]
84pub struct WorkItem {
85    /// Range index to process
86    pub range_idx: u16,
87    /// Priority (lower = higher priority)
88    pub priority: u8,
89    /// Status
90    pub status: u8,
91}
92
93impl WorkItem {
94    pub const STATUS_PENDING: u8 = 0;
95    pub const STATUS_IN_PROGRESS: u8 = 1;
96    pub const STATUS_COMPLETE: u8 = 2;
97}
98
99/// Shared state for coordination (fits in shared memory)
100#[repr(C, align(64))]
101pub struct SharedCoordinator {
102    /// Global minimum cut found so far
103    pub global_min_cut: AtomicU16,
104    /// Number of cores that have completed (u16 to support NUM_CORES=256)
105    pub completed_cores: AtomicU16,
106    /// Current phase
107    pub phase: AtomicU8,
108    /// Work queue head (for work stealing)
109    pub queue_head: AtomicU16,
110    /// Work queue tail
111    pub queue_tail: AtomicU16,
112    /// Best result core ID
113    pub best_core: AtomicU8,
114    /// Padding for alignment
115    _pad: [u8; 52],
116}
117
118impl SharedCoordinator {
119    pub const PHASE_INIT: u8 = 0;
120    pub const PHASE_DISTRIBUTE: u8 = 1;
121    pub const PHASE_COMPUTE: u8 = 2;
122    pub const PHASE_COLLECT: u8 = 3;
123    pub const PHASE_DONE: u8 = 4;
124
125    pub fn new() -> Self {
126        Self {
127            global_min_cut: AtomicU16::new(u16::MAX),
128            completed_cores: AtomicU16::new(0),
129            phase: AtomicU8::new(Self::PHASE_INIT),
130            queue_head: AtomicU16::new(0),
131            queue_tail: AtomicU16::new(0),
132            best_core: AtomicU8::new(0),
133            _pad: [0; 52],
134        }
135    }
136
137    /// Try to update global minimum (atomic compare-and-swap)
138    pub fn try_update_min(&self, new_min: u16, core_id: u8) -> bool {
139        let mut current = self.global_min_cut.load(Ordering::Acquire);
140        loop {
141            if new_min >= current {
142                return false;
143            }
144            match self.global_min_cut.compare_exchange_weak(
145                current,
146                new_min,
147                Ordering::AcqRel,
148                Ordering::Acquire,
149            ) {
150                Ok(_) => {
151                    self.best_core.store(core_id, Ordering::Release);
152                    return true;
153                }
154                Err(c) => current = c,
155            }
156        }
157    }
158
159    /// Mark core as completed
160    pub fn mark_completed(&self) -> u16 {
161        self.completed_cores.fetch_add(1, Ordering::AcqRel) + 1
162    }
163
164    /// Check if all cores completed
165    pub fn all_completed(&self) -> bool {
166        self.completed_cores.load(Ordering::Acquire) >= NUM_CORES as u16
167    }
168}
169
170/// Compute range bounds for a core
171#[inline]
172pub fn compute_core_range(core_id: u8) -> (u16, u16) {
173    let i = core_id as u32;
174    let lambda_min = (RANGE_FACTOR.powi(i as i32)).floor() as u16;
175    let lambda_max = (RANGE_FACTOR.powi((i + 1) as i32)).floor() as u16;
176    (lambda_min.max(1), lambda_max.max(1))
177}
178
179/// Distribute graph across cores based on strategy
180pub struct CoreDistributor {
181    pub strategy: CoreStrategy,
182    pub num_vertices: u16,
183    pub num_edges: u16,
184}
185
186impl CoreDistributor {
187    pub fn new(strategy: CoreStrategy, num_vertices: u16, num_edges: u16) -> Self {
188        Self { strategy, num_vertices, num_edges }
189    }
190
191    /// Determine which core should handle a vertex
192    #[inline]
193    pub fn vertex_to_core(&self, v: CompactVertexId) -> u8 {
194        match self.strategy {
195            CoreStrategy::GeometricRanges => {
196                // All vertices go to all cores (replicated)
197                0
198            }
199            CoreStrategy::GraphPartition => {
200                // Partition by vertex ID
201                ((v as u32 * NUM_CORES as u32) / self.num_vertices as u32) as u8
202            }
203            CoreStrategy::WorkStealing => {
204                // Dynamic assignment
205                0
206            }
207        }
208    }
209
210    /// Get the range of vertices for a core
211    pub fn core_vertex_range(&self, core_id: u8) -> (CompactVertexId, CompactVertexId) {
212        match self.strategy {
213            CoreStrategy::GeometricRanges => {
214                (0, self.num_vertices)
215            }
216            CoreStrategy::GraphPartition => {
217                let n = self.num_vertices as u32;
218                let start = (core_id as u32 * n) / NUM_CORES as u32;
219                let end = ((core_id as u32 + 1) * n) / NUM_CORES as u32;
220                (start as u16, end as u16)
221            }
222            CoreStrategy::WorkStealing => {
223                (0, self.num_vertices)
224            }
225        }
226    }
227}
228
229/// Per-core execution context
230pub struct CoreExecutor<'a> {
231    /// Core identifier (0-255)
232    pub core_id: u8,
233    /// Core state containing graph and witness data
234    pub state: CompactCoreState,
235    /// Reference to shared coordinator for cross-core synchronization
236    pub coordinator: Option<&'a SharedCoordinator>,
237}
238
239impl<'a> CoreExecutor<'a> {
240    /// Initialize core with its assigned range
241    pub fn init(core_id: u8, coordinator: Option<&'a SharedCoordinator>) -> Self {
242        let (lambda_min, lambda_max) = compute_core_range(core_id);
243
244        let state = CompactCoreState {
245            adjacency: Default::default(),
246            edges: [CompactEdge::default(); MAX_EDGES_PER_CORE],
247            num_vertices: 0,
248            num_edges: 0,
249            min_cut: u16::MAX,
250            best_witness: CompactWitness::default(),
251            lambda_min,
252            lambda_max,
253            core_id,
254            status: CompactCoreState::STATUS_IDLE,
255        };
256
257        Self {
258            core_id,
259            state,
260            coordinator,
261        }
262    }
263
264    /// Add edge to this core's local graph
265    pub fn add_edge(&mut self, src: CompactVertexId, tgt: CompactVertexId, weight: u16) {
266        if self.state.num_edges as usize >= 512 {
267            return; // Full
268        }
269
270        let idx = self.state.num_edges as usize;
271        self.state.edges[idx] = CompactEdge {
272            source: src,
273            target: tgt,
274            weight,
275            flags: CompactEdge::FLAG_ACTIVE,
276        };
277        self.state.num_edges += 1;
278
279        // Track vertices
280        self.state.num_vertices = self.state.num_vertices
281            .max(src + 1)
282            .max(tgt + 1);
283    }
284
285    /// Process this core's assigned range
286    pub fn process(&mut self) -> CoreResult {
287        self.state.status = CompactCoreState::STATUS_PROCESSING;
288
289        // Simple minimum cut via minimum degree heuristic
290        // (Full algorithm would use LocalKCut here)
291        let mut min_degree = u16::MAX;
292        let mut min_vertex = 0u16;
293
294        for v in 0..self.state.num_vertices {
295            let degree = self.compute_degree(v);
296            if degree > 0 && degree < min_degree {
297                min_degree = degree;
298                min_vertex = v;
299            }
300        }
301
302        // Check if in our range
303        if min_degree >= self.state.lambda_min && min_degree <= self.state.lambda_max {
304            self.state.min_cut = min_degree;
305
306            // Create witness
307            let mut membership = BitSet256::new();
308            membership.insert(min_vertex);
309            self.state.best_witness = CompactWitness::new(min_vertex, membership, min_degree);
310
311            // Try to update global minimum
312            if let Some(coord) = self.coordinator {
313                coord.try_update_min(min_degree, self.core_id);
314            }
315        }
316
317        self.state.status = CompactCoreState::STATUS_DONE;
318
319        // Report result
320        if let Some(coord) = self.coordinator {
321            coord.mark_completed();
322        }
323
324        CoreResult {
325            core_id: self.core_id,
326            status: self.state.status,
327            min_cut: self.state.min_cut,
328            witness_hash: self.state.best_witness.hash,
329            witness_seed: self.state.best_witness.seed,
330            witness_cardinality: self.state.best_witness.cardinality,
331            witness_boundary: self.state.best_witness.boundary_size,
332            padding: [0; 4],
333        }
334    }
335
336    /// Compute degree of a vertex
337    fn compute_degree(&self, v: CompactVertexId) -> u16 {
338        let mut degree = 0u16;
339        for i in 0..self.state.num_edges as usize {
340            let edge = &self.state.edges[i];
341            if edge.is_active() && (edge.source == v || edge.target == v) {
342                // Sum weights for weighted min-cut (not edge count)
343                degree = degree.saturating_add(edge.weight);
344            }
345        }
346        degree
347    }
348
349    /// SIMD-accelerated boundary computation for a vertex set
350    ///
351    /// Uses WASM SIMD128 when available for parallel edge checking
352    #[inline]
353    pub fn compute_boundary_simd(&self, set: &BitSet256) -> u16 {
354        // Collect active edges as (source, target) pairs
355        let edges: Vec<(CompactVertexId, CompactVertexId)> = self.state.edges
356            [..self.state.num_edges as usize]
357            .iter()
358            .filter(|e| e.is_active())
359            .map(|e| (e.source, e.target))
360            .collect();
361
362        // Use SIMD-accelerated boundary computation
363        simd_boundary_size(set, &edges)
364    }
365
366    /// SIMD-accelerated population count for membership sets
367    #[inline]
368    pub fn membership_count_simd(&self, set: &BitSet256) -> u32 {
369        simd_popcount(&set.bits)
370    }
371}
372
373/// Result aggregator for collecting results from all cores
374pub struct ResultAggregator {
375    /// Results from each core
376    pub results: [CoreResult; NUM_CORES],
377    /// Index of the best result
378    pub best_idx: usize,
379    /// Global minimum cut value found
380    pub global_min: u16,
381}
382
383impl ResultAggregator {
384    /// Create a new result aggregator
385    pub fn new() -> Self {
386        Self {
387            results: [CoreResult::default(); NUM_CORES],
388            best_idx: 0,
389            global_min: u16::MAX,
390        }
391    }
392
393    /// Add a result from a core and update the best if needed
394    pub fn add_result(&mut self, result: CoreResult) {
395        let idx = result.core_id as usize;
396        self.results[idx] = result;
397
398        if result.min_cut < self.global_min {
399            self.global_min = result.min_cut;
400            self.best_idx = idx;
401        }
402    }
403
404    /// Get the best result (lowest minimum cut)
405    pub fn best_result(&self) -> &CoreResult {
406        &self.results[self.best_idx]
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    #[test]
415    fn test_compute_core_range() {
416        let (min0, max0) = compute_core_range(0);
417        assert_eq!(min0, 1);
418        assert_eq!(max0, 1);
419
420        let (min10, max10) = compute_core_range(10);
421        assert_eq!(min10, 6);
422        assert_eq!(max10, 7);
423    }
424
425    #[test]
426    fn test_shared_coordinator() {
427        let coord = SharedCoordinator::new();
428
429        assert!(coord.try_update_min(100, 0));
430        assert_eq!(coord.global_min_cut.load(Ordering::Acquire), 100);
431
432        assert!(coord.try_update_min(50, 1));
433        assert_eq!(coord.global_min_cut.load(Ordering::Acquire), 50);
434
435        assert!(!coord.try_update_min(60, 2)); // 60 > 50
436        assert_eq!(coord.global_min_cut.load(Ordering::Acquire), 50);
437    }
438
439    #[test]
440    fn test_core_executor() {
441        let coord = SharedCoordinator::new();
442        let mut exec = CoreExecutor::init(0, Some(&coord));
443
444        exec.add_edge(0, 1, 1);
445        exec.add_edge(1, 2, 1);
446
447        let result = exec.process();
448        assert_eq!(result.core_id, 0);
449    }
450
451    #[test]
452    fn test_result_aggregator() {
453        let mut agg = ResultAggregator::new();
454
455        agg.add_result(CoreResult {
456            core_id: 0,
457            min_cut: 100,
458            ..Default::default()
459        });
460
461        agg.add_result(CoreResult {
462            core_id: 1,
463            min_cut: 50,
464            ..Default::default()
465        });
466
467        assert_eq!(agg.global_min, 50);
468        assert_eq!(agg.best_idx, 1);
469    }
470}