1use crate::graph::VertexId;
12use std::collections::HashMap;
13
14#[derive(Debug, Clone)]
16pub struct BatchConfig {
17 pub max_batch_size: usize,
19 pub buffer_size: usize,
21 pub alignment: usize,
23 pub memory_pooling: bool,
25}
26
27impl Default for BatchConfig {
28 fn default() -> Self {
29 Self {
30 max_batch_size: 1024,
31 buffer_size: 64 * 1024, alignment: 64, memory_pooling: true,
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
40pub enum BatchOperation {
41 InsertEdges(Vec<(VertexId, VertexId, f64)>),
43 DeleteEdges(Vec<(VertexId, VertexId)>),
45 UpdateWeights(Vec<(VertexId, VertexId, f64)>),
47 QueryDistances(Vec<(VertexId, VertexId)>),
49 ComputeCuts(Vec<Vec<VertexId>>),
51}
52
53#[derive(Debug, Clone)]
55pub struct BatchResult {
56 pub operation: String,
58 pub items_processed: usize,
60 pub time_us: u64,
62 pub results: Vec<f64>,
64 pub error: Option<String>,
66}
67
68#[repr(C, align(64))]
73pub struct TypedArrayTransfer {
74 pub f64_buffer: Vec<f64>,
76 pub u64_buffer: Vec<u64>,
78 pub u32_buffer: Vec<u32>,
80 pub byte_buffer: Vec<u8>,
82 position: usize,
84}
85
86impl TypedArrayTransfer {
87 pub fn new() -> Self {
89 Self::with_capacity(1024)
90 }
91
92 pub fn with_capacity(capacity: usize) -> Self {
94 Self {
95 f64_buffer: Vec::with_capacity(capacity),
96 u64_buffer: Vec::with_capacity(capacity),
97 u32_buffer: Vec::with_capacity(capacity * 2),
98 byte_buffer: Vec::with_capacity(capacity * 8),
99 position: 0,
100 }
101 }
102
103 pub fn reset(&mut self) {
105 self.f64_buffer.clear();
106 self.u64_buffer.clear();
107 self.u32_buffer.clear();
108 self.byte_buffer.clear();
109 self.position = 0;
110 }
111
112 pub fn add_edge(&mut self, source: VertexId, target: VertexId, weight: f64) {
114 self.u64_buffer.push(source);
115 self.u64_buffer.push(target);
116 self.f64_buffer.push(weight);
117 }
118
119 pub fn add_vertex(&mut self, vertex: VertexId) {
121 self.u64_buffer.push(vertex);
122 }
123
124 pub fn add_distance(&mut self, distance: f64) {
126 self.f64_buffer.push(distance);
127 }
128
129 pub fn get_edges(&self) -> Vec<(VertexId, VertexId, f64)> {
131 let mut edges = Vec::with_capacity(self.f64_buffer.len());
132
133 for (i, &weight) in self.f64_buffer.iter().enumerate() {
134 let source = self.u64_buffer.get(i * 2).copied().unwrap_or(0);
135 let target = self.u64_buffer.get(i * 2 + 1).copied().unwrap_or(0);
136 edges.push((source, target, weight));
137 }
138
139 edges
140 }
141
142 pub fn f64_ptr(&self) -> *const f64 {
144 self.f64_buffer.as_ptr()
145 }
146
147 pub fn u64_ptr(&self) -> *const u64 {
149 self.u64_buffer.as_ptr()
150 }
151
152 pub fn len(&self) -> (usize, usize, usize) {
154 (
155 self.f64_buffer.len(),
156 self.u64_buffer.len(),
157 self.u32_buffer.len(),
158 )
159 }
160
161 pub fn is_empty(&self) -> bool {
163 self.f64_buffer.is_empty() && self.u64_buffer.is_empty()
164 }
165}
166
167impl Default for TypedArrayTransfer {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173pub struct WasmBatchOps {
175 config: BatchConfig,
176 transfer: TypedArrayTransfer,
178 pending: Vec<BatchOperation>,
180 total_ops: u64,
182 total_items: u64,
183 total_time_us: u64,
184}
185
186impl WasmBatchOps {
187 pub fn new() -> Self {
189 Self::with_config(BatchConfig::default())
190 }
191
192 pub fn with_config(config: BatchConfig) -> Self {
194 Self {
195 transfer: TypedArrayTransfer::with_capacity(config.buffer_size / 8),
196 config,
197 pending: Vec::new(),
198 total_ops: 0,
199 total_items: 0,
200 total_time_us: 0,
201 }
202 }
203
204 pub fn queue_insert_edges(&mut self, edges: Vec<(VertexId, VertexId, f64)>) {
206 if edges.len() > self.config.max_batch_size {
207 for chunk in edges.chunks(self.config.max_batch_size) {
209 self.pending
210 .push(BatchOperation::InsertEdges(chunk.to_vec()));
211 }
212 } else {
213 self.pending.push(BatchOperation::InsertEdges(edges));
214 }
215 }
216
217 pub fn queue_delete_edges(&mut self, edges: Vec<(VertexId, VertexId)>) {
219 if edges.len() > self.config.max_batch_size {
220 for chunk in edges.chunks(self.config.max_batch_size) {
221 self.pending
222 .push(BatchOperation::DeleteEdges(chunk.to_vec()));
223 }
224 } else {
225 self.pending.push(BatchOperation::DeleteEdges(edges));
226 }
227 }
228
229 pub fn queue_distance_queries(&mut self, pairs: Vec<(VertexId, VertexId)>) {
231 if pairs.len() > self.config.max_batch_size {
232 for chunk in pairs.chunks(self.config.max_batch_size) {
233 self.pending
234 .push(BatchOperation::QueryDistances(chunk.to_vec()));
235 }
236 } else {
237 self.pending.push(BatchOperation::QueryDistances(pairs));
238 }
239 }
240
241 pub fn execute_batch(&mut self) -> Vec<BatchResult> {
243 let _start = std::time::Instant::now();
244
245 let pending_ops: Vec<_> = self.pending.drain(..).collect();
247 let mut results = Vec::with_capacity(pending_ops.len());
248
249 for op in pending_ops {
250 let op_start = std::time::Instant::now();
251 let result = self.execute_operation(op);
252 let elapsed = op_start.elapsed().as_micros() as u64;
253
254 self.total_ops += 1;
255 self.total_items += result.items_processed as u64;
256 self.total_time_us += elapsed;
257
258 results.push(result);
259 }
260
261 self.transfer.reset();
262 results
263 }
264
265 fn execute_operation(&mut self, op: BatchOperation) -> BatchResult {
267 match op {
268 BatchOperation::InsertEdges(edges) => {
269 let count = edges.len();
270
271 self.transfer.reset();
273 for (u, v, w) in &edges {
274 self.transfer.add_edge(*u, *v, *w);
275 }
276
277 BatchResult {
280 operation: "InsertEdges".to_string(),
281 items_processed: count,
282 time_us: 0,
283 results: Vec::new(),
284 error: None,
285 }
286 }
287
288 BatchOperation::DeleteEdges(edges) => {
289 let count = edges.len();
290
291 self.transfer.reset();
292 for (u, v) in &edges {
293 self.transfer.add_vertex(*u);
294 self.transfer.add_vertex(*v);
295 }
296
297 BatchResult {
298 operation: "DeleteEdges".to_string(),
299 items_processed: count,
300 time_us: 0,
301 results: Vec::new(),
302 error: None,
303 }
304 }
305
306 BatchOperation::UpdateWeights(updates) => {
307 let count = updates.len();
308
309 self.transfer.reset();
310 for (u, v, w) in &updates {
311 self.transfer.add_edge(*u, *v, *w);
312 }
313
314 BatchResult {
315 operation: "UpdateWeights".to_string(),
316 items_processed: count,
317 time_us: 0,
318 results: Vec::new(),
319 error: None,
320 }
321 }
322
323 BatchOperation::QueryDistances(pairs) => {
324 let count = pairs.len();
325
326 self.transfer.reset();
327 for (u, v) in &pairs {
328 self.transfer.add_vertex(*u);
329 self.transfer.add_vertex(*v);
330 }
331
332 let results: Vec<f64> = pairs
334 .iter()
335 .map(|(u, v)| if u == v { 0.0 } else { 1.0 })
336 .collect();
337
338 BatchResult {
339 operation: "QueryDistances".to_string(),
340 items_processed: count,
341 time_us: 0,
342 results,
343 error: None,
344 }
345 }
346
347 BatchOperation::ComputeCuts(partitions) => {
348 let count = partitions.len();
349
350 BatchResult {
351 operation: "ComputeCuts".to_string(),
352 items_processed: count,
353 time_us: 0,
354 results: vec![0.0; count],
355 error: None,
356 }
357 }
358 }
359 }
360
361 pub fn pending_count(&self) -> usize {
363 self.pending.len()
364 }
365
366 pub fn stats(&self) -> BatchStats {
368 BatchStats {
369 total_operations: self.total_ops,
370 total_items: self.total_items,
371 total_time_us: self.total_time_us,
372 avg_items_per_op: if self.total_ops > 0 {
373 self.total_items as f64 / self.total_ops as f64
374 } else {
375 0.0
376 },
377 avg_time_per_item_us: if self.total_items > 0 {
378 self.total_time_us as f64 / self.total_items as f64
379 } else {
380 0.0
381 },
382 }
383 }
384
385 pub fn clear(&mut self) {
387 self.pending.clear();
388 self.transfer.reset();
389 }
390}
391
392impl Default for WasmBatchOps {
393 fn default() -> Self {
394 Self::new()
395 }
396}
397
398#[derive(Debug, Clone, Default)]
400pub struct BatchStats {
401 pub total_operations: u64,
403 pub total_items: u64,
405 pub total_time_us: u64,
407 pub avg_items_per_op: f64,
409 pub avg_time_per_item_us: f64,
411}
412
413#[repr(C, align(64))]
415pub struct WasmMemoryRegion {
416 data: Vec<u8>,
418 capacity: usize,
420 offset: usize,
422}
423
424impl WasmMemoryRegion {
425 pub fn new(size: usize) -> Self {
427 let aligned_size = (size + 63) & !63;
429 Self {
430 data: vec![0u8; aligned_size],
431 capacity: aligned_size,
432 offset: 0,
433 }
434 }
435
436 pub fn alloc(&mut self, size: usize, align: usize) -> Option<usize> {
441 let aligned_offset = (self.offset + align - 1) & !(align - 1);
443
444 if aligned_offset + size > self.capacity {
445 return None;
446 }
447
448 let result = aligned_offset;
449 self.offset = aligned_offset + size;
450 Some(result)
451 }
452
453 pub fn get_slice(&self, offset: usize, len: usize) -> Option<&[u8]> {
455 if offset + len <= self.capacity {
456 Some(&self.data[offset..offset + len])
457 } else {
458 None
459 }
460 }
461
462 pub fn get_slice_mut(&mut self, offset: usize, len: usize) -> Option<&mut [u8]> {
464 if offset + len <= self.capacity {
465 Some(&mut self.data[offset..offset + len])
466 } else {
467 None
468 }
469 }
470
471 pub fn reset(&mut self) {
473 self.offset = 0;
474 }
477
478 pub fn remaining(&self) -> usize {
480 self.capacity - self.offset
481 }
482
483 pub fn used(&self) -> usize {
485 self.offset
486 }
487
488 pub fn as_ptr(&self) -> *const u8 {
490 self.data.as_ptr()
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn test_typed_array_transfer() {
500 let mut transfer = TypedArrayTransfer::new();
501
502 transfer.add_edge(1, 2, 1.0);
503 transfer.add_edge(2, 3, 2.0);
504
505 let edges = transfer.get_edges();
506 assert_eq!(edges.len(), 2);
507 assert_eq!(edges[0], (1, 2, 1.0));
508 assert_eq!(edges[1], (2, 3, 2.0));
509 }
510
511 #[test]
512 fn test_batch_queue() {
513 let mut batch = WasmBatchOps::new();
514
515 let edges = vec![(1, 2, 1.0), (2, 3, 2.0)];
516 batch.queue_insert_edges(edges);
517
518 assert_eq!(batch.pending_count(), 1);
519 }
520
521 #[test]
522 fn test_batch_execute() {
523 let mut batch = WasmBatchOps::new();
524
525 batch.queue_insert_edges(vec![(1, 2, 1.0)]);
526 batch.queue_delete_edges(vec![(3, 4)]);
527
528 let results = batch.execute_batch();
529
530 assert_eq!(results.len(), 2);
531 assert_eq!(results[0].operation, "InsertEdges");
532 assert_eq!(results[1].operation, "DeleteEdges");
533 assert_eq!(batch.pending_count(), 0);
534 }
535
536 #[test]
537 fn test_batch_splitting() {
538 let mut batch = WasmBatchOps::with_config(BatchConfig {
539 max_batch_size: 10,
540 ..Default::default()
541 });
542
543 let edges: Vec<_> = (0..25).map(|i| (i, i + 1, 1.0)).collect();
545 batch.queue_insert_edges(edges);
546
547 assert_eq!(batch.pending_count(), 3);
549 }
550
551 #[test]
552 fn test_distance_queries() {
553 let mut batch = WasmBatchOps::new();
554
555 batch.queue_distance_queries(vec![(1, 2), (2, 3), (1, 1)]);
556
557 let results = batch.execute_batch();
558
559 assert_eq!(results.len(), 1);
560 assert_eq!(results[0].results.len(), 3);
561 assert_eq!(results[0].results[2], 0.0); }
563
564 #[test]
565 fn test_wasm_memory_region() {
566 let mut region = WasmMemoryRegion::new(1024);
567
568 let offset1 = region.alloc(100, 64);
570 assert!(offset1.is_some());
571 assert_eq!(offset1.unwrap() % 64, 0);
572
573 let offset2 = region.alloc(200, 64);
574 assert!(offset2.is_some());
575
576 let slice1 = region.get_slice(offset1.unwrap(), 100);
578 assert!(slice1.is_some());
579
580 assert!(region.used() > 0);
581 assert!(region.remaining() < 1024);
582
583 region.reset();
584 assert_eq!(region.used(), 0);
585 }
586
587 #[test]
588 fn test_batch_stats() {
589 let mut batch = WasmBatchOps::new();
590
591 batch.queue_insert_edges(vec![(1, 2, 1.0), (2, 3, 2.0)]);
592 let _ = batch.execute_batch();
593
594 let stats = batch.stats();
595 assert_eq!(stats.total_operations, 1);
596 assert_eq!(stats.total_items, 2);
597 }
598
599 #[test]
600 fn test_transfer_reset() {
601 let mut transfer = TypedArrayTransfer::new();
602
603 transfer.add_edge(1, 2, 1.0);
604 assert!(!transfer.is_empty());
605
606 transfer.reset();
607 assert!(transfer.is_empty());
608 }
609}