1use crate::graph::VertexId;
12use crate::time_compat::PortableInstant;
13use std::collections::HashMap;
14
15#[derive(Debug, Clone)]
17pub struct BatchConfig {
18 pub max_batch_size: usize,
20 pub buffer_size: usize,
22 pub alignment: usize,
24 pub memory_pooling: bool,
26}
27
28impl Default for BatchConfig {
29 fn default() -> Self {
30 Self {
31 max_batch_size: 1024,
32 buffer_size: 64 * 1024, alignment: 64, memory_pooling: true,
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
41pub enum BatchOperation {
42 InsertEdges(Vec<(VertexId, VertexId, f64)>),
44 DeleteEdges(Vec<(VertexId, VertexId)>),
46 UpdateWeights(Vec<(VertexId, VertexId, f64)>),
48 QueryDistances(Vec<(VertexId, VertexId)>),
50 ComputeCuts(Vec<Vec<VertexId>>),
52}
53
54#[derive(Debug, Clone)]
56pub struct BatchResult {
57 pub operation: String,
59 pub items_processed: usize,
61 pub time_us: u64,
63 pub results: Vec<f64>,
65 pub error: Option<String>,
67}
68
69#[repr(C, align(64))]
74pub struct TypedArrayTransfer {
75 pub f64_buffer: Vec<f64>,
77 pub u64_buffer: Vec<u64>,
79 pub u32_buffer: Vec<u32>,
81 pub byte_buffer: Vec<u8>,
83 position: usize,
85}
86
87impl TypedArrayTransfer {
88 pub fn new() -> Self {
90 Self::with_capacity(1024)
91 }
92
93 pub fn with_capacity(capacity: usize) -> Self {
95 Self {
96 f64_buffer: Vec::with_capacity(capacity),
97 u64_buffer: Vec::with_capacity(capacity),
98 u32_buffer: Vec::with_capacity(capacity * 2),
99 byte_buffer: Vec::with_capacity(capacity * 8),
100 position: 0,
101 }
102 }
103
104 pub fn reset(&mut self) {
106 self.f64_buffer.clear();
107 self.u64_buffer.clear();
108 self.u32_buffer.clear();
109 self.byte_buffer.clear();
110 self.position = 0;
111 }
112
113 pub fn add_edge(&mut self, source: VertexId, target: VertexId, weight: f64) {
115 self.u64_buffer.push(source);
116 self.u64_buffer.push(target);
117 self.f64_buffer.push(weight);
118 }
119
120 pub fn add_vertex(&mut self, vertex: VertexId) {
122 self.u64_buffer.push(vertex);
123 }
124
125 pub fn add_distance(&mut self, distance: f64) {
127 self.f64_buffer.push(distance);
128 }
129
130 pub fn get_edges(&self) -> Vec<(VertexId, VertexId, f64)> {
132 let mut edges = Vec::with_capacity(self.f64_buffer.len());
133
134 for (i, &weight) in self.f64_buffer.iter().enumerate() {
135 let source = self.u64_buffer.get(i * 2).copied().unwrap_or(0);
136 let target = self.u64_buffer.get(i * 2 + 1).copied().unwrap_or(0);
137 edges.push((source, target, weight));
138 }
139
140 edges
141 }
142
143 pub fn f64_ptr(&self) -> *const f64 {
145 self.f64_buffer.as_ptr()
146 }
147
148 pub fn u64_ptr(&self) -> *const u64 {
150 self.u64_buffer.as_ptr()
151 }
152
153 pub fn len(&self) -> (usize, usize, usize) {
155 (
156 self.f64_buffer.len(),
157 self.u64_buffer.len(),
158 self.u32_buffer.len(),
159 )
160 }
161
162 pub fn is_empty(&self) -> bool {
164 self.f64_buffer.is_empty() && self.u64_buffer.is_empty()
165 }
166}
167
168impl Default for TypedArrayTransfer {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174pub struct WasmBatchOps {
176 config: BatchConfig,
177 transfer: TypedArrayTransfer,
179 pending: Vec<BatchOperation>,
181 total_ops: u64,
183 total_items: u64,
184 total_time_us: u64,
185}
186
187impl WasmBatchOps {
188 pub fn new() -> Self {
190 Self::with_config(BatchConfig::default())
191 }
192
193 pub fn with_config(config: BatchConfig) -> Self {
195 Self {
196 transfer: TypedArrayTransfer::with_capacity(config.buffer_size / 8),
197 config,
198 pending: Vec::new(),
199 total_ops: 0,
200 total_items: 0,
201 total_time_us: 0,
202 }
203 }
204
205 pub fn queue_insert_edges(&mut self, edges: Vec<(VertexId, VertexId, f64)>) {
207 if edges.len() > self.config.max_batch_size {
208 for chunk in edges.chunks(self.config.max_batch_size) {
210 self.pending
211 .push(BatchOperation::InsertEdges(chunk.to_vec()));
212 }
213 } else {
214 self.pending.push(BatchOperation::InsertEdges(edges));
215 }
216 }
217
218 pub fn queue_delete_edges(&mut self, edges: Vec<(VertexId, VertexId)>) {
220 if edges.len() > self.config.max_batch_size {
221 for chunk in edges.chunks(self.config.max_batch_size) {
222 self.pending
223 .push(BatchOperation::DeleteEdges(chunk.to_vec()));
224 }
225 } else {
226 self.pending.push(BatchOperation::DeleteEdges(edges));
227 }
228 }
229
230 pub fn queue_distance_queries(&mut self, pairs: Vec<(VertexId, VertexId)>) {
232 if pairs.len() > self.config.max_batch_size {
233 for chunk in pairs.chunks(self.config.max_batch_size) {
234 self.pending
235 .push(BatchOperation::QueryDistances(chunk.to_vec()));
236 }
237 } else {
238 self.pending.push(BatchOperation::QueryDistances(pairs));
239 }
240 }
241
242 pub fn execute_batch(&mut self) -> Vec<BatchResult> {
244 let _start = PortableInstant::now();
245
246 let pending_ops: Vec<_> = self.pending.drain(..).collect();
248 let mut results = Vec::with_capacity(pending_ops.len());
249
250 for op in pending_ops {
251 let op_start = PortableInstant::now();
252 let result = self.execute_operation(op);
253 let elapsed = op_start.elapsed().as_micros() as u64;
254
255 self.total_ops += 1;
256 self.total_items += result.items_processed as u64;
257 self.total_time_us += elapsed;
258
259 results.push(result);
260 }
261
262 self.transfer.reset();
263 results
264 }
265
266 fn execute_operation(&mut self, op: BatchOperation) -> BatchResult {
268 match op {
269 BatchOperation::InsertEdges(edges) => {
270 let count = edges.len();
271
272 self.transfer.reset();
274 for (u, v, w) in &edges {
275 self.transfer.add_edge(*u, *v, *w);
276 }
277
278 BatchResult {
281 operation: "InsertEdges".to_string(),
282 items_processed: count,
283 time_us: 0,
284 results: Vec::new(),
285 error: None,
286 }
287 }
288
289 BatchOperation::DeleteEdges(edges) => {
290 let count = edges.len();
291
292 self.transfer.reset();
293 for (u, v) in &edges {
294 self.transfer.add_vertex(*u);
295 self.transfer.add_vertex(*v);
296 }
297
298 BatchResult {
299 operation: "DeleteEdges".to_string(),
300 items_processed: count,
301 time_us: 0,
302 results: Vec::new(),
303 error: None,
304 }
305 }
306
307 BatchOperation::UpdateWeights(updates) => {
308 let count = updates.len();
309
310 self.transfer.reset();
311 for (u, v, w) in &updates {
312 self.transfer.add_edge(*u, *v, *w);
313 }
314
315 BatchResult {
316 operation: "UpdateWeights".to_string(),
317 items_processed: count,
318 time_us: 0,
319 results: Vec::new(),
320 error: None,
321 }
322 }
323
324 BatchOperation::QueryDistances(pairs) => {
325 let count = pairs.len();
326
327 self.transfer.reset();
328 for (u, v) in &pairs {
329 self.transfer.add_vertex(*u);
330 self.transfer.add_vertex(*v);
331 }
332
333 let results: Vec<f64> = pairs
335 .iter()
336 .map(|(u, v)| if u == v { 0.0 } else { 1.0 })
337 .collect();
338
339 BatchResult {
340 operation: "QueryDistances".to_string(),
341 items_processed: count,
342 time_us: 0,
343 results,
344 error: None,
345 }
346 }
347
348 BatchOperation::ComputeCuts(partitions) => {
349 let count = partitions.len();
350
351 BatchResult {
352 operation: "ComputeCuts".to_string(),
353 items_processed: count,
354 time_us: 0,
355 results: vec![0.0; count],
356 error: None,
357 }
358 }
359 }
360 }
361
362 pub fn pending_count(&self) -> usize {
364 self.pending.len()
365 }
366
367 pub fn stats(&self) -> BatchStats {
369 BatchStats {
370 total_operations: self.total_ops,
371 total_items: self.total_items,
372 total_time_us: self.total_time_us,
373 avg_items_per_op: if self.total_ops > 0 {
374 self.total_items as f64 / self.total_ops as f64
375 } else {
376 0.0
377 },
378 avg_time_per_item_us: if self.total_items > 0 {
379 self.total_time_us as f64 / self.total_items as f64
380 } else {
381 0.0
382 },
383 }
384 }
385
386 pub fn clear(&mut self) {
388 self.pending.clear();
389 self.transfer.reset();
390 }
391}
392
393impl Default for WasmBatchOps {
394 fn default() -> Self {
395 Self::new()
396 }
397}
398
399#[derive(Debug, Clone, Default)]
401pub struct BatchStats {
402 pub total_operations: u64,
404 pub total_items: u64,
406 pub total_time_us: u64,
408 pub avg_items_per_op: f64,
410 pub avg_time_per_item_us: f64,
412}
413
414#[repr(C, align(64))]
416pub struct WasmMemoryRegion {
417 data: Vec<u8>,
419 capacity: usize,
421 offset: usize,
423}
424
425impl WasmMemoryRegion {
426 pub fn new(size: usize) -> Self {
428 let aligned_size = (size + 63) & !63;
430 Self {
431 data: vec![0u8; aligned_size],
432 capacity: aligned_size,
433 offset: 0,
434 }
435 }
436
437 pub fn alloc(&mut self, size: usize, align: usize) -> Option<usize> {
442 let aligned_offset = (self.offset + align - 1) & !(align - 1);
444
445 if aligned_offset + size > self.capacity {
446 return None;
447 }
448
449 let result = aligned_offset;
450 self.offset = aligned_offset + size;
451 Some(result)
452 }
453
454 pub fn get_slice(&self, offset: usize, len: usize) -> Option<&[u8]> {
456 if offset + len <= self.capacity {
457 Some(&self.data[offset..offset + len])
458 } else {
459 None
460 }
461 }
462
463 pub fn get_slice_mut(&mut self, offset: usize, len: usize) -> Option<&mut [u8]> {
465 if offset + len <= self.capacity {
466 Some(&mut self.data[offset..offset + len])
467 } else {
468 None
469 }
470 }
471
472 pub fn reset(&mut self) {
474 self.offset = 0;
475 }
478
479 pub fn remaining(&self) -> usize {
481 self.capacity - self.offset
482 }
483
484 pub fn used(&self) -> usize {
486 self.offset
487 }
488
489 pub fn as_ptr(&self) -> *const u8 {
491 self.data.as_ptr()
492 }
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 #[test]
500 fn test_typed_array_transfer() {
501 let mut transfer = TypedArrayTransfer::new();
502
503 transfer.add_edge(1, 2, 1.0);
504 transfer.add_edge(2, 3, 2.0);
505
506 let edges = transfer.get_edges();
507 assert_eq!(edges.len(), 2);
508 assert_eq!(edges[0], (1, 2, 1.0));
509 assert_eq!(edges[1], (2, 3, 2.0));
510 }
511
512 #[test]
513 fn test_batch_queue() {
514 let mut batch = WasmBatchOps::new();
515
516 let edges = vec![(1, 2, 1.0), (2, 3, 2.0)];
517 batch.queue_insert_edges(edges);
518
519 assert_eq!(batch.pending_count(), 1);
520 }
521
522 #[test]
523 fn test_batch_execute() {
524 let mut batch = WasmBatchOps::new();
525
526 batch.queue_insert_edges(vec![(1, 2, 1.0)]);
527 batch.queue_delete_edges(vec![(3, 4)]);
528
529 let results = batch.execute_batch();
530
531 assert_eq!(results.len(), 2);
532 assert_eq!(results[0].operation, "InsertEdges");
533 assert_eq!(results[1].operation, "DeleteEdges");
534 assert_eq!(batch.pending_count(), 0);
535 }
536
537 #[test]
538 fn test_batch_splitting() {
539 let mut batch = WasmBatchOps::with_config(BatchConfig {
540 max_batch_size: 10,
541 ..Default::default()
542 });
543
544 let edges: Vec<_> = (0..25).map(|i| (i, i + 1, 1.0)).collect();
546 batch.queue_insert_edges(edges);
547
548 assert_eq!(batch.pending_count(), 3);
550 }
551
552 #[test]
553 fn test_distance_queries() {
554 let mut batch = WasmBatchOps::new();
555
556 batch.queue_distance_queries(vec![(1, 2), (2, 3), (1, 1)]);
557
558 let results = batch.execute_batch();
559
560 assert_eq!(results.len(), 1);
561 assert_eq!(results[0].results.len(), 3);
562 assert_eq!(results[0].results[2], 0.0); }
564
565 #[test]
566 fn test_wasm_memory_region() {
567 let mut region = WasmMemoryRegion::new(1024);
568
569 let offset1 = region.alloc(100, 64);
571 assert!(offset1.is_some());
572 assert_eq!(offset1.unwrap() % 64, 0);
573
574 let offset2 = region.alloc(200, 64);
575 assert!(offset2.is_some());
576
577 let slice1 = region.get_slice(offset1.unwrap(), 100);
579 assert!(slice1.is_some());
580
581 assert!(region.used() > 0);
582 assert!(region.remaining() < 1024);
583
584 region.reset();
585 assert_eq!(region.used(), 0);
586 }
587
588 #[test]
589 fn test_batch_stats() {
590 let mut batch = WasmBatchOps::new();
591
592 batch.queue_insert_edges(vec![(1, 2, 1.0), (2, 3, 2.0)]);
593 let _ = batch.execute_batch();
594
595 let stats = batch.stats();
596 assert_eq!(stats.total_operations, 1);
597 assert_eq!(stats.total_items, 2);
598 }
599
600 #[test]
601 fn test_transfer_reset() {
602 let mut transfer = TypedArrayTransfer::new();
603
604 transfer.add_edge(1, 2, 1.0);
605 assert!(!transfer.is_empty());
606
607 transfer.reset();
608 assert!(transfer.is_empty());
609 }
610}