Skip to main content

ruvector_mincut/optimization/
wasm_batch.rs

1//! WASM Batch Operations and TypedArray Optimizations
2//!
3//! Optimizations specific to WebAssembly execution:
4//! - Batch FFI calls to minimize overhead
5//! - Pre-allocated WASM memory
6//! - TypedArray bulk transfers
7//! - Memory alignment for SIMD
8//!
9//! Target: 10x reduction in FFI overhead
10
11use crate::graph::VertexId;
12use std::collections::HashMap;
13
14/// Configuration for WASM batch operations
15#[derive(Debug, Clone)]
16pub struct BatchConfig {
17    /// Maximum batch size
18    pub max_batch_size: usize,
19    /// Pre-allocated buffer size in bytes
20    pub buffer_size: usize,
21    /// Alignment for SIMD operations
22    pub alignment: usize,
23    /// Enable memory pooling
24    pub memory_pooling: bool,
25}
26
27impl Default for BatchConfig {
28    fn default() -> Self {
29        Self {
30            max_batch_size: 1024,
31            buffer_size: 64 * 1024, // 64KB
32            alignment: 64,          // AVX-512 alignment
33            memory_pooling: true,
34        }
35    }
36}
37
38/// Batch operation types for minimizing FFI calls
39#[derive(Debug, Clone)]
40pub enum BatchOperation {
41    /// Insert multiple edges
42    InsertEdges(Vec<(VertexId, VertexId, f64)>),
43    /// Delete multiple edges
44    DeleteEdges(Vec<(VertexId, VertexId)>),
45    /// Update multiple weights
46    UpdateWeights(Vec<(VertexId, VertexId, f64)>),
47    /// Query multiple distances
48    QueryDistances(Vec<(VertexId, VertexId)>),
49    /// Compute cuts for multiple partitions
50    ComputeCuts(Vec<Vec<VertexId>>),
51}
52
53/// Result from batch operation
54#[derive(Debug, Clone)]
55pub struct BatchResult {
56    /// Operation type
57    pub operation: String,
58    /// Number of items processed
59    pub items_processed: usize,
60    /// Time taken in microseconds
61    pub time_us: u64,
62    /// Results (for queries)
63    pub results: Vec<f64>,
64    /// Error message if any
65    pub error: Option<String>,
66}
67
68/// TypedArray transfer for efficient WASM memory access
69///
70/// Provides aligned memory buffers for bulk data transfer between
71/// JavaScript and WASM.
72#[repr(C, align(64))]
73pub struct TypedArrayTransfer {
74    /// Float64 buffer for weights/distances
75    pub f64_buffer: Vec<f64>,
76    /// Uint64 buffer for vertex IDs
77    pub u64_buffer: Vec<u64>,
78    /// Uint32 buffer for indices/counts
79    pub u32_buffer: Vec<u32>,
80    /// Byte buffer for raw data
81    pub byte_buffer: Vec<u8>,
82    /// Current position in buffers
83    position: usize,
84}
85
86impl TypedArrayTransfer {
87    /// Create new transfer with default buffer size
88    pub fn new() -> Self {
89        Self::with_capacity(1024)
90    }
91
92    /// Create with specific capacity
93    pub fn with_capacity(capacity: usize) -> Self {
94        Self {
95            f64_buffer: Vec::with_capacity(capacity),
96            u64_buffer: Vec::with_capacity(capacity),
97            u32_buffer: Vec::with_capacity(capacity * 2),
98            byte_buffer: Vec::with_capacity(capacity * 8),
99            position: 0,
100        }
101    }
102
103    /// Reset buffers for reuse
104    pub fn reset(&mut self) {
105        self.f64_buffer.clear();
106        self.u64_buffer.clear();
107        self.u32_buffer.clear();
108        self.byte_buffer.clear();
109        self.position = 0;
110    }
111
112    /// Add edge to transfer buffer
113    pub fn add_edge(&mut self, source: VertexId, target: VertexId, weight: f64) {
114        self.u64_buffer.push(source);
115        self.u64_buffer.push(target);
116        self.f64_buffer.push(weight);
117    }
118
119    /// Add vertex to transfer buffer
120    pub fn add_vertex(&mut self, vertex: VertexId) {
121        self.u64_buffer.push(vertex);
122    }
123
124    /// Add distance result
125    pub fn add_distance(&mut self, distance: f64) {
126        self.f64_buffer.push(distance);
127    }
128
129    /// Get edges from buffer
130    pub fn get_edges(&self) -> Vec<(VertexId, VertexId, f64)> {
131        let mut edges = Vec::with_capacity(self.f64_buffer.len());
132
133        for (i, &weight) in self.f64_buffer.iter().enumerate() {
134            let source = self.u64_buffer.get(i * 2).copied().unwrap_or(0);
135            let target = self.u64_buffer.get(i * 2 + 1).copied().unwrap_or(0);
136            edges.push((source, target, weight));
137        }
138
139        edges
140    }
141
142    /// Get f64 buffer as raw pointer (for FFI)
143    pub fn f64_ptr(&self) -> *const f64 {
144        self.f64_buffer.as_ptr()
145    }
146
147    /// Get u64 buffer as raw pointer (for FFI)
148    pub fn u64_ptr(&self) -> *const u64 {
149        self.u64_buffer.as_ptr()
150    }
151
152    /// Get buffer lengths
153    pub fn len(&self) -> (usize, usize, usize) {
154        (
155            self.f64_buffer.len(),
156            self.u64_buffer.len(),
157            self.u32_buffer.len(),
158        )
159    }
160
161    /// Check if empty
162    pub fn is_empty(&self) -> bool {
163        self.f64_buffer.is_empty() && self.u64_buffer.is_empty()
164    }
165}
166
167impl Default for TypedArrayTransfer {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173/// WASM batch operations executor
174pub struct WasmBatchOps {
175    config: BatchConfig,
176    /// Transfer buffer
177    transfer: TypedArrayTransfer,
178    /// Pending operations
179    pending: Vec<BatchOperation>,
180    /// Statistics
181    total_ops: u64,
182    total_items: u64,
183    total_time_us: u64,
184}
185
186impl WasmBatchOps {
187    /// Create new batch executor with default config
188    pub fn new() -> Self {
189        Self::with_config(BatchConfig::default())
190    }
191
192    /// Create with custom config
193    pub fn with_config(config: BatchConfig) -> Self {
194        Self {
195            transfer: TypedArrayTransfer::with_capacity(config.buffer_size / 8),
196            config,
197            pending: Vec::new(),
198            total_ops: 0,
199            total_items: 0,
200            total_time_us: 0,
201        }
202    }
203
204    /// Queue edge insertions for batch processing
205    pub fn queue_insert_edges(&mut self, edges: Vec<(VertexId, VertexId, f64)>) {
206        if edges.len() > self.config.max_batch_size {
207            // Split into multiple batches
208            for chunk in edges.chunks(self.config.max_batch_size) {
209                self.pending
210                    .push(BatchOperation::InsertEdges(chunk.to_vec()));
211            }
212        } else {
213            self.pending.push(BatchOperation::InsertEdges(edges));
214        }
215    }
216
217    /// Queue edge deletions for batch processing
218    pub fn queue_delete_edges(&mut self, edges: Vec<(VertexId, VertexId)>) {
219        if edges.len() > self.config.max_batch_size {
220            for chunk in edges.chunks(self.config.max_batch_size) {
221                self.pending
222                    .push(BatchOperation::DeleteEdges(chunk.to_vec()));
223            }
224        } else {
225            self.pending.push(BatchOperation::DeleteEdges(edges));
226        }
227    }
228
229    /// Queue distance queries for batch processing
230    pub fn queue_distance_queries(&mut self, pairs: Vec<(VertexId, VertexId)>) {
231        if pairs.len() > self.config.max_batch_size {
232            for chunk in pairs.chunks(self.config.max_batch_size) {
233                self.pending
234                    .push(BatchOperation::QueryDistances(chunk.to_vec()));
235            }
236        } else {
237            self.pending.push(BatchOperation::QueryDistances(pairs));
238        }
239    }
240
241    /// Execute all pending operations
242    pub fn execute_batch(&mut self) -> Vec<BatchResult> {
243        let _start = std::time::Instant::now();
244
245        // Drain pending operations to avoid borrow conflict
246        let pending_ops: Vec<_> = self.pending.drain(..).collect();
247        let mut results = Vec::with_capacity(pending_ops.len());
248
249        for op in pending_ops {
250            let op_start = std::time::Instant::now();
251            let result = self.execute_operation(op);
252            let elapsed = op_start.elapsed().as_micros() as u64;
253
254            self.total_ops += 1;
255            self.total_items += result.items_processed as u64;
256            self.total_time_us += elapsed;
257
258            results.push(result);
259        }
260
261        self.transfer.reset();
262        results
263    }
264
265    /// Execute a single operation
266    fn execute_operation(&mut self, op: BatchOperation) -> BatchResult {
267        match op {
268            BatchOperation::InsertEdges(edges) => {
269                let count = edges.len();
270
271                // Prepare transfer buffer
272                self.transfer.reset();
273                for (u, v, w) in &edges {
274                    self.transfer.add_edge(*u, *v, *w);
275                }
276
277                // In WASM, this would call the native insert function
278                // For now, we simulate the batch operation
279                BatchResult {
280                    operation: "InsertEdges".to_string(),
281                    items_processed: count,
282                    time_us: 0,
283                    results: Vec::new(),
284                    error: None,
285                }
286            }
287
288            BatchOperation::DeleteEdges(edges) => {
289                let count = edges.len();
290
291                self.transfer.reset();
292                for (u, v) in &edges {
293                    self.transfer.add_vertex(*u);
294                    self.transfer.add_vertex(*v);
295                }
296
297                BatchResult {
298                    operation: "DeleteEdges".to_string(),
299                    items_processed: count,
300                    time_us: 0,
301                    results: Vec::new(),
302                    error: None,
303                }
304            }
305
306            BatchOperation::UpdateWeights(updates) => {
307                let count = updates.len();
308
309                self.transfer.reset();
310                for (u, v, w) in &updates {
311                    self.transfer.add_edge(*u, *v, *w);
312                }
313
314                BatchResult {
315                    operation: "UpdateWeights".to_string(),
316                    items_processed: count,
317                    time_us: 0,
318                    results: Vec::new(),
319                    error: None,
320                }
321            }
322
323            BatchOperation::QueryDistances(pairs) => {
324                let count = pairs.len();
325
326                self.transfer.reset();
327                for (u, v) in &pairs {
328                    self.transfer.add_vertex(*u);
329                    self.transfer.add_vertex(*v);
330                }
331
332                // Simulate distance results
333                let results: Vec<f64> = pairs
334                    .iter()
335                    .map(|(u, v)| if u == v { 0.0 } else { 1.0 })
336                    .collect();
337
338                BatchResult {
339                    operation: "QueryDistances".to_string(),
340                    items_processed: count,
341                    time_us: 0,
342                    results,
343                    error: None,
344                }
345            }
346
347            BatchOperation::ComputeCuts(partitions) => {
348                let count = partitions.len();
349
350                BatchResult {
351                    operation: "ComputeCuts".to_string(),
352                    items_processed: count,
353                    time_us: 0,
354                    results: vec![0.0; count],
355                    error: None,
356                }
357            }
358        }
359    }
360
361    /// Get number of pending operations
362    pub fn pending_count(&self) -> usize {
363        self.pending.len()
364    }
365
366    /// Get statistics
367    pub fn stats(&self) -> BatchStats {
368        BatchStats {
369            total_operations: self.total_ops,
370            total_items: self.total_items,
371            total_time_us: self.total_time_us,
372            avg_items_per_op: if self.total_ops > 0 {
373                self.total_items as f64 / self.total_ops as f64
374            } else {
375                0.0
376            },
377            avg_time_per_item_us: if self.total_items > 0 {
378                self.total_time_us as f64 / self.total_items as f64
379            } else {
380                0.0
381            },
382        }
383    }
384
385    /// Clear pending operations
386    pub fn clear(&mut self) {
387        self.pending.clear();
388        self.transfer.reset();
389    }
390}
391
392impl Default for WasmBatchOps {
393    fn default() -> Self {
394        Self::new()
395    }
396}
397
398/// Statistics for batch operations
399#[derive(Debug, Clone, Default)]
400pub struct BatchStats {
401    /// Total operations executed
402    pub total_operations: u64,
403    /// Total items processed
404    pub total_items: u64,
405    /// Total time in microseconds
406    pub total_time_us: u64,
407    /// Average items per operation
408    pub avg_items_per_op: f64,
409    /// Average time per item in microseconds
410    pub avg_time_per_item_us: f64,
411}
412
413/// Pre-allocated WASM memory region
414#[repr(C, align(64))]
415pub struct WasmMemoryRegion {
416    /// Raw memory
417    data: Vec<u8>,
418    /// Capacity in bytes
419    capacity: usize,
420    /// Current offset
421    offset: usize,
422}
423
424impl WasmMemoryRegion {
425    /// Create new memory region
426    pub fn new(size: usize) -> Self {
427        // Round up to alignment
428        let aligned_size = (size + 63) & !63;
429        Self {
430            data: vec![0u8; aligned_size],
431            capacity: aligned_size,
432            offset: 0,
433        }
434    }
435
436    /// Allocate bytes from region, returns the offset
437    ///
438    /// Returns the starting offset of the allocated region.
439    /// Use `get_slice` to access the allocated memory safely.
440    pub fn alloc(&mut self, size: usize, align: usize) -> Option<usize> {
441        // Align offset
442        let aligned_offset = (self.offset + align - 1) & !(align - 1);
443
444        if aligned_offset + size > self.capacity {
445            return None;
446        }
447
448        let result = aligned_offset;
449        self.offset = aligned_offset + size;
450        Some(result)
451    }
452
453    /// Get a slice at the given offset
454    pub fn get_slice(&self, offset: usize, len: usize) -> Option<&[u8]> {
455        if offset + len <= self.capacity {
456            Some(&self.data[offset..offset + len])
457        } else {
458            None
459        }
460    }
461
462    /// Get a mutable slice at the given offset
463    pub fn get_slice_mut(&mut self, offset: usize, len: usize) -> Option<&mut [u8]> {
464        if offset + len <= self.capacity {
465            Some(&mut self.data[offset..offset + len])
466        } else {
467            None
468        }
469    }
470
471    /// Reset region for reuse
472    pub fn reset(&mut self) {
473        self.offset = 0;
474        // Optional: zero memory
475        // self.data.fill(0);
476    }
477
478    /// Get remaining capacity
479    pub fn remaining(&self) -> usize {
480        self.capacity - self.offset
481    }
482
483    /// Get used bytes
484    pub fn used(&self) -> usize {
485        self.offset
486    }
487
488    /// Get raw pointer
489    pub fn as_ptr(&self) -> *const u8 {
490        self.data.as_ptr()
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn test_typed_array_transfer() {
500        let mut transfer = TypedArrayTransfer::new();
501
502        transfer.add_edge(1, 2, 1.0);
503        transfer.add_edge(2, 3, 2.0);
504
505        let edges = transfer.get_edges();
506        assert_eq!(edges.len(), 2);
507        assert_eq!(edges[0], (1, 2, 1.0));
508        assert_eq!(edges[1], (2, 3, 2.0));
509    }
510
511    #[test]
512    fn test_batch_queue() {
513        let mut batch = WasmBatchOps::new();
514
515        let edges = vec![(1, 2, 1.0), (2, 3, 2.0)];
516        batch.queue_insert_edges(edges);
517
518        assert_eq!(batch.pending_count(), 1);
519    }
520
521    #[test]
522    fn test_batch_execute() {
523        let mut batch = WasmBatchOps::new();
524
525        batch.queue_insert_edges(vec![(1, 2, 1.0)]);
526        batch.queue_delete_edges(vec![(3, 4)]);
527
528        let results = batch.execute_batch();
529
530        assert_eq!(results.len(), 2);
531        assert_eq!(results[0].operation, "InsertEdges");
532        assert_eq!(results[1].operation, "DeleteEdges");
533        assert_eq!(batch.pending_count(), 0);
534    }
535
536    #[test]
537    fn test_batch_splitting() {
538        let mut batch = WasmBatchOps::with_config(BatchConfig {
539            max_batch_size: 10,
540            ..Default::default()
541        });
542
543        // Queue 25 edges
544        let edges: Vec<_> = (0..25).map(|i| (i, i + 1, 1.0)).collect();
545        batch.queue_insert_edges(edges);
546
547        // Should be split into 3 batches
548        assert_eq!(batch.pending_count(), 3);
549    }
550
551    #[test]
552    fn test_distance_queries() {
553        let mut batch = WasmBatchOps::new();
554
555        batch.queue_distance_queries(vec![(1, 2), (2, 3), (1, 1)]);
556
557        let results = batch.execute_batch();
558
559        assert_eq!(results.len(), 1);
560        assert_eq!(results[0].results.len(), 3);
561        assert_eq!(results[0].results[2], 0.0); // Same vertex
562    }
563
564    #[test]
565    fn test_wasm_memory_region() {
566        let mut region = WasmMemoryRegion::new(1024);
567
568        // Allocate 64-byte aligned
569        let offset1 = region.alloc(100, 64);
570        assert!(offset1.is_some());
571        assert_eq!(offset1.unwrap() % 64, 0);
572
573        let offset2 = region.alloc(200, 64);
574        assert!(offset2.is_some());
575
576        // Verify we can get slices
577        let slice1 = region.get_slice(offset1.unwrap(), 100);
578        assert!(slice1.is_some());
579
580        assert!(region.used() > 0);
581        assert!(region.remaining() < 1024);
582
583        region.reset();
584        assert_eq!(region.used(), 0);
585    }
586
587    #[test]
588    fn test_batch_stats() {
589        let mut batch = WasmBatchOps::new();
590
591        batch.queue_insert_edges(vec![(1, 2, 1.0), (2, 3, 2.0)]);
592        let _ = batch.execute_batch();
593
594        let stats = batch.stats();
595        assert_eq!(stats.total_operations, 1);
596        assert_eq!(stats.total_items, 2);
597    }
598
599    #[test]
600    fn test_transfer_reset() {
601        let mut transfer = TypedArrayTransfer::new();
602
603        transfer.add_edge(1, 2, 1.0);
604        assert!(!transfer.is_empty());
605
606        transfer.reset();
607        assert!(transfer.is_empty());
608    }
609}