Skip to main content

ruvector_mincut/optimization/
wasm_batch.rs

1//! WASM Batch Operations and TypedArray Optimizations
2//!
3//! Optimizations specific to WebAssembly execution:
4//! - Batch FFI calls to minimize overhead
5//! - Pre-allocated WASM memory
6//! - TypedArray bulk transfers
7//! - Memory alignment for SIMD
8//!
9//! Target: 10x reduction in FFI overhead
10
11use crate::graph::VertexId;
12use crate::time_compat::PortableInstant;
13use std::collections::HashMap;
14
15/// Configuration for WASM batch operations
16#[derive(Debug, Clone)]
17pub struct BatchConfig {
18    /// Maximum batch size
19    pub max_batch_size: usize,
20    /// Pre-allocated buffer size in bytes
21    pub buffer_size: usize,
22    /// Alignment for SIMD operations
23    pub alignment: usize,
24    /// Enable memory pooling
25    pub memory_pooling: bool,
26}
27
28impl Default for BatchConfig {
29    fn default() -> Self {
30        Self {
31            max_batch_size: 1024,
32            buffer_size: 64 * 1024, // 64KB
33            alignment: 64,          // AVX-512 alignment
34            memory_pooling: true,
35        }
36    }
37}
38
39/// Batch operation types for minimizing FFI calls
40#[derive(Debug, Clone)]
41pub enum BatchOperation {
42    /// Insert multiple edges
43    InsertEdges(Vec<(VertexId, VertexId, f64)>),
44    /// Delete multiple edges
45    DeleteEdges(Vec<(VertexId, VertexId)>),
46    /// Update multiple weights
47    UpdateWeights(Vec<(VertexId, VertexId, f64)>),
48    /// Query multiple distances
49    QueryDistances(Vec<(VertexId, VertexId)>),
50    /// Compute cuts for multiple partitions
51    ComputeCuts(Vec<Vec<VertexId>>),
52}
53
54/// Result from batch operation
55#[derive(Debug, Clone)]
56pub struct BatchResult {
57    /// Operation type
58    pub operation: String,
59    /// Number of items processed
60    pub items_processed: usize,
61    /// Time taken in microseconds
62    pub time_us: u64,
63    /// Results (for queries)
64    pub results: Vec<f64>,
65    /// Error message if any
66    pub error: Option<String>,
67}
68
69/// TypedArray transfer for efficient WASM memory access
70///
71/// Provides aligned memory buffers for bulk data transfer between
72/// JavaScript and WASM.
73#[repr(C, align(64))]
74pub struct TypedArrayTransfer {
75    /// Float64 buffer for weights/distances
76    pub f64_buffer: Vec<f64>,
77    /// Uint64 buffer for vertex IDs
78    pub u64_buffer: Vec<u64>,
79    /// Uint32 buffer for indices/counts
80    pub u32_buffer: Vec<u32>,
81    /// Byte buffer for raw data
82    pub byte_buffer: Vec<u8>,
83    /// Current position in buffers
84    position: usize,
85}
86
87impl TypedArrayTransfer {
88    /// Create new transfer with default buffer size
89    pub fn new() -> Self {
90        Self::with_capacity(1024)
91    }
92
93    /// Create with specific capacity
94    pub fn with_capacity(capacity: usize) -> Self {
95        Self {
96            f64_buffer: Vec::with_capacity(capacity),
97            u64_buffer: Vec::with_capacity(capacity),
98            u32_buffer: Vec::with_capacity(capacity * 2),
99            byte_buffer: Vec::with_capacity(capacity * 8),
100            position: 0,
101        }
102    }
103
104    /// Reset buffers for reuse
105    pub fn reset(&mut self) {
106        self.f64_buffer.clear();
107        self.u64_buffer.clear();
108        self.u32_buffer.clear();
109        self.byte_buffer.clear();
110        self.position = 0;
111    }
112
113    /// Add edge to transfer buffer
114    pub fn add_edge(&mut self, source: VertexId, target: VertexId, weight: f64) {
115        self.u64_buffer.push(source);
116        self.u64_buffer.push(target);
117        self.f64_buffer.push(weight);
118    }
119
120    /// Add vertex to transfer buffer
121    pub fn add_vertex(&mut self, vertex: VertexId) {
122        self.u64_buffer.push(vertex);
123    }
124
125    /// Add distance result
126    pub fn add_distance(&mut self, distance: f64) {
127        self.f64_buffer.push(distance);
128    }
129
130    /// Get edges from buffer
131    pub fn get_edges(&self) -> Vec<(VertexId, VertexId, f64)> {
132        let mut edges = Vec::with_capacity(self.f64_buffer.len());
133
134        for (i, &weight) in self.f64_buffer.iter().enumerate() {
135            let source = self.u64_buffer.get(i * 2).copied().unwrap_or(0);
136            let target = self.u64_buffer.get(i * 2 + 1).copied().unwrap_or(0);
137            edges.push((source, target, weight));
138        }
139
140        edges
141    }
142
143    /// Get f64 buffer as raw pointer (for FFI)
144    pub fn f64_ptr(&self) -> *const f64 {
145        self.f64_buffer.as_ptr()
146    }
147
148    /// Get u64 buffer as raw pointer (for FFI)
149    pub fn u64_ptr(&self) -> *const u64 {
150        self.u64_buffer.as_ptr()
151    }
152
153    /// Get buffer lengths
154    pub fn len(&self) -> (usize, usize, usize) {
155        (
156            self.f64_buffer.len(),
157            self.u64_buffer.len(),
158            self.u32_buffer.len(),
159        )
160    }
161
162    /// Check if empty
163    pub fn is_empty(&self) -> bool {
164        self.f64_buffer.is_empty() && self.u64_buffer.is_empty()
165    }
166}
167
168impl Default for TypedArrayTransfer {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174/// WASM batch operations executor
175pub struct WasmBatchOps {
176    config: BatchConfig,
177    /// Transfer buffer
178    transfer: TypedArrayTransfer,
179    /// Pending operations
180    pending: Vec<BatchOperation>,
181    /// Statistics
182    total_ops: u64,
183    total_items: u64,
184    total_time_us: u64,
185}
186
187impl WasmBatchOps {
188    /// Create new batch executor with default config
189    pub fn new() -> Self {
190        Self::with_config(BatchConfig::default())
191    }
192
193    /// Create with custom config
194    pub fn with_config(config: BatchConfig) -> Self {
195        Self {
196            transfer: TypedArrayTransfer::with_capacity(config.buffer_size / 8),
197            config,
198            pending: Vec::new(),
199            total_ops: 0,
200            total_items: 0,
201            total_time_us: 0,
202        }
203    }
204
205    /// Queue edge insertions for batch processing
206    pub fn queue_insert_edges(&mut self, edges: Vec<(VertexId, VertexId, f64)>) {
207        if edges.len() > self.config.max_batch_size {
208            // Split into multiple batches
209            for chunk in edges.chunks(self.config.max_batch_size) {
210                self.pending
211                    .push(BatchOperation::InsertEdges(chunk.to_vec()));
212            }
213        } else {
214            self.pending.push(BatchOperation::InsertEdges(edges));
215        }
216    }
217
218    /// Queue edge deletions for batch processing
219    pub fn queue_delete_edges(&mut self, edges: Vec<(VertexId, VertexId)>) {
220        if edges.len() > self.config.max_batch_size {
221            for chunk in edges.chunks(self.config.max_batch_size) {
222                self.pending
223                    .push(BatchOperation::DeleteEdges(chunk.to_vec()));
224            }
225        } else {
226            self.pending.push(BatchOperation::DeleteEdges(edges));
227        }
228    }
229
230    /// Queue distance queries for batch processing
231    pub fn queue_distance_queries(&mut self, pairs: Vec<(VertexId, VertexId)>) {
232        if pairs.len() > self.config.max_batch_size {
233            for chunk in pairs.chunks(self.config.max_batch_size) {
234                self.pending
235                    .push(BatchOperation::QueryDistances(chunk.to_vec()));
236            }
237        } else {
238            self.pending.push(BatchOperation::QueryDistances(pairs));
239        }
240    }
241
242    /// Execute all pending operations
243    pub fn execute_batch(&mut self) -> Vec<BatchResult> {
244        let _start = PortableInstant::now();
245
246        // Drain pending operations to avoid borrow conflict
247        let pending_ops: Vec<_> = self.pending.drain(..).collect();
248        let mut results = Vec::with_capacity(pending_ops.len());
249
250        for op in pending_ops {
251            let op_start = PortableInstant::now();
252            let result = self.execute_operation(op);
253            let elapsed = op_start.elapsed().as_micros() as u64;
254
255            self.total_ops += 1;
256            self.total_items += result.items_processed as u64;
257            self.total_time_us += elapsed;
258
259            results.push(result);
260        }
261
262        self.transfer.reset();
263        results
264    }
265
266    /// Execute a single operation
267    fn execute_operation(&mut self, op: BatchOperation) -> BatchResult {
268        match op {
269            BatchOperation::InsertEdges(edges) => {
270                let count = edges.len();
271
272                // Prepare transfer buffer
273                self.transfer.reset();
274                for (u, v, w) in &edges {
275                    self.transfer.add_edge(*u, *v, *w);
276                }
277
278                // In WASM, this would call the native insert function
279                // For now, we simulate the batch operation
280                BatchResult {
281                    operation: "InsertEdges".to_string(),
282                    items_processed: count,
283                    time_us: 0,
284                    results: Vec::new(),
285                    error: None,
286                }
287            }
288
289            BatchOperation::DeleteEdges(edges) => {
290                let count = edges.len();
291
292                self.transfer.reset();
293                for (u, v) in &edges {
294                    self.transfer.add_vertex(*u);
295                    self.transfer.add_vertex(*v);
296                }
297
298                BatchResult {
299                    operation: "DeleteEdges".to_string(),
300                    items_processed: count,
301                    time_us: 0,
302                    results: Vec::new(),
303                    error: None,
304                }
305            }
306
307            BatchOperation::UpdateWeights(updates) => {
308                let count = updates.len();
309
310                self.transfer.reset();
311                for (u, v, w) in &updates {
312                    self.transfer.add_edge(*u, *v, *w);
313                }
314
315                BatchResult {
316                    operation: "UpdateWeights".to_string(),
317                    items_processed: count,
318                    time_us: 0,
319                    results: Vec::new(),
320                    error: None,
321                }
322            }
323
324            BatchOperation::QueryDistances(pairs) => {
325                let count = pairs.len();
326
327                self.transfer.reset();
328                for (u, v) in &pairs {
329                    self.transfer.add_vertex(*u);
330                    self.transfer.add_vertex(*v);
331                }
332
333                // Simulate distance results
334                let results: Vec<f64> = pairs
335                    .iter()
336                    .map(|(u, v)| if u == v { 0.0 } else { 1.0 })
337                    .collect();
338
339                BatchResult {
340                    operation: "QueryDistances".to_string(),
341                    items_processed: count,
342                    time_us: 0,
343                    results,
344                    error: None,
345                }
346            }
347
348            BatchOperation::ComputeCuts(partitions) => {
349                let count = partitions.len();
350
351                BatchResult {
352                    operation: "ComputeCuts".to_string(),
353                    items_processed: count,
354                    time_us: 0,
355                    results: vec![0.0; count],
356                    error: None,
357                }
358            }
359        }
360    }
361
362    /// Get number of pending operations
363    pub fn pending_count(&self) -> usize {
364        self.pending.len()
365    }
366
367    /// Get statistics
368    pub fn stats(&self) -> BatchStats {
369        BatchStats {
370            total_operations: self.total_ops,
371            total_items: self.total_items,
372            total_time_us: self.total_time_us,
373            avg_items_per_op: if self.total_ops > 0 {
374                self.total_items as f64 / self.total_ops as f64
375            } else {
376                0.0
377            },
378            avg_time_per_item_us: if self.total_items > 0 {
379                self.total_time_us as f64 / self.total_items as f64
380            } else {
381                0.0
382            },
383        }
384    }
385
386    /// Clear pending operations
387    pub fn clear(&mut self) {
388        self.pending.clear();
389        self.transfer.reset();
390    }
391}
392
393impl Default for WasmBatchOps {
394    fn default() -> Self {
395        Self::new()
396    }
397}
398
399/// Statistics for batch operations
400#[derive(Debug, Clone, Default)]
401pub struct BatchStats {
402    /// Total operations executed
403    pub total_operations: u64,
404    /// Total items processed
405    pub total_items: u64,
406    /// Total time in microseconds
407    pub total_time_us: u64,
408    /// Average items per operation
409    pub avg_items_per_op: f64,
410    /// Average time per item in microseconds
411    pub avg_time_per_item_us: f64,
412}
413
414/// Pre-allocated WASM memory region
415#[repr(C, align(64))]
416pub struct WasmMemoryRegion {
417    /// Raw memory
418    data: Vec<u8>,
419    /// Capacity in bytes
420    capacity: usize,
421    /// Current offset
422    offset: usize,
423}
424
425impl WasmMemoryRegion {
426    /// Create new memory region
427    pub fn new(size: usize) -> Self {
428        // Round up to alignment
429        let aligned_size = (size + 63) & !63;
430        Self {
431            data: vec![0u8; aligned_size],
432            capacity: aligned_size,
433            offset: 0,
434        }
435    }
436
437    /// Allocate bytes from region, returns the offset
438    ///
439    /// Returns the starting offset of the allocated region.
440    /// Use `get_slice` to access the allocated memory safely.
441    pub fn alloc(&mut self, size: usize, align: usize) -> Option<usize> {
442        // Align offset
443        let aligned_offset = (self.offset + align - 1) & !(align - 1);
444
445        if aligned_offset + size > self.capacity {
446            return None;
447        }
448
449        let result = aligned_offset;
450        self.offset = aligned_offset + size;
451        Some(result)
452    }
453
454    /// Get a slice at the given offset
455    pub fn get_slice(&self, offset: usize, len: usize) -> Option<&[u8]> {
456        if offset + len <= self.capacity {
457            Some(&self.data[offset..offset + len])
458        } else {
459            None
460        }
461    }
462
463    /// Get a mutable slice at the given offset
464    pub fn get_slice_mut(&mut self, offset: usize, len: usize) -> Option<&mut [u8]> {
465        if offset + len <= self.capacity {
466            Some(&mut self.data[offset..offset + len])
467        } else {
468            None
469        }
470    }
471
472    /// Reset region for reuse
473    pub fn reset(&mut self) {
474        self.offset = 0;
475        // Optional: zero memory
476        // self.data.fill(0);
477    }
478
479    /// Get remaining capacity
480    pub fn remaining(&self) -> usize {
481        self.capacity - self.offset
482    }
483
484    /// Get used bytes
485    pub fn used(&self) -> usize {
486        self.offset
487    }
488
489    /// Get raw pointer
490    pub fn as_ptr(&self) -> *const u8 {
491        self.data.as_ptr()
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    #[test]
500    fn test_typed_array_transfer() {
501        let mut transfer = TypedArrayTransfer::new();
502
503        transfer.add_edge(1, 2, 1.0);
504        transfer.add_edge(2, 3, 2.0);
505
506        let edges = transfer.get_edges();
507        assert_eq!(edges.len(), 2);
508        assert_eq!(edges[0], (1, 2, 1.0));
509        assert_eq!(edges[1], (2, 3, 2.0));
510    }
511
512    #[test]
513    fn test_batch_queue() {
514        let mut batch = WasmBatchOps::new();
515
516        let edges = vec![(1, 2, 1.0), (2, 3, 2.0)];
517        batch.queue_insert_edges(edges);
518
519        assert_eq!(batch.pending_count(), 1);
520    }
521
522    #[test]
523    fn test_batch_execute() {
524        let mut batch = WasmBatchOps::new();
525
526        batch.queue_insert_edges(vec![(1, 2, 1.0)]);
527        batch.queue_delete_edges(vec![(3, 4)]);
528
529        let results = batch.execute_batch();
530
531        assert_eq!(results.len(), 2);
532        assert_eq!(results[0].operation, "InsertEdges");
533        assert_eq!(results[1].operation, "DeleteEdges");
534        assert_eq!(batch.pending_count(), 0);
535    }
536
537    #[test]
538    fn test_batch_splitting() {
539        let mut batch = WasmBatchOps::with_config(BatchConfig {
540            max_batch_size: 10,
541            ..Default::default()
542        });
543
544        // Queue 25 edges
545        let edges: Vec<_> = (0..25).map(|i| (i, i + 1, 1.0)).collect();
546        batch.queue_insert_edges(edges);
547
548        // Should be split into 3 batches
549        assert_eq!(batch.pending_count(), 3);
550    }
551
552    #[test]
553    fn test_distance_queries() {
554        let mut batch = WasmBatchOps::new();
555
556        batch.queue_distance_queries(vec![(1, 2), (2, 3), (1, 1)]);
557
558        let results = batch.execute_batch();
559
560        assert_eq!(results.len(), 1);
561        assert_eq!(results[0].results.len(), 3);
562        assert_eq!(results[0].results[2], 0.0); // Same vertex
563    }
564
565    #[test]
566    fn test_wasm_memory_region() {
567        let mut region = WasmMemoryRegion::new(1024);
568
569        // Allocate 64-byte aligned
570        let offset1 = region.alloc(100, 64);
571        assert!(offset1.is_some());
572        assert_eq!(offset1.unwrap() % 64, 0);
573
574        let offset2 = region.alloc(200, 64);
575        assert!(offset2.is_some());
576
577        // Verify we can get slices
578        let slice1 = region.get_slice(offset1.unwrap(), 100);
579        assert!(slice1.is_some());
580
581        assert!(region.used() > 0);
582        assert!(region.remaining() < 1024);
583
584        region.reset();
585        assert_eq!(region.used(), 0);
586    }
587
588    #[test]
589    fn test_batch_stats() {
590        let mut batch = WasmBatchOps::new();
591
592        batch.queue_insert_edges(vec![(1, 2, 1.0), (2, 3, 2.0)]);
593        let _ = batch.execute_batch();
594
595        let stats = batch.stats();
596        assert_eq!(stats.total_operations, 1);
597        assert_eq!(stats.total_items, 2);
598    }
599
600    #[test]
601    fn test_transfer_reset() {
602        let mut transfer = TypedArrayTransfer::new();
603
604        transfer.add_edge(1, 2, 1.0);
605        assert!(!transfer.is_empty());
606
607        transfer.reset();
608        assert!(transfer.is_empty());
609    }
610}