Skip to main content

reifydb_engine/transform/
wasm.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! WASM transform implementation that executes WebAssembly modules as columnar transforms
5
6use reifydb_core::value::column::columns::Columns;
7use reifydb_sdk::{
8	error::FFIError,
9	marshal::wasm::{marshal_columns_to_bytes, unmarshal_columns_from_bytes},
10};
11use reifydb_type::Result;
12use reifydb_wasm::{Engine, SpawnBinary, module::value::Value, source};
13
14use super::{Transform, context::TransformContext};
15
16/// WASM transform that loads and executes a `.wasm` module.
17///
18/// Each WASM module must export:
19/// - `alloc(size: i32) -> i32` — allocate `size` bytes, return pointer
20/// - `dealloc(ptr: i32, size: i32)` — free memory
21/// - `transform(input_ptr: i32, input_len: i32) -> i32` — pointer to output (first 4 bytes at output pointer = output
22///   length as LE u32)
23pub struct WasmTransform {
24	name: String,
25	wasm_bytes: Vec<u8>,
26}
27
28impl WasmTransform {
29	pub fn new(name: impl Into<String>, wasm_bytes: Vec<u8>) -> Self {
30		Self {
31			name: name.into(),
32			wasm_bytes,
33		}
34	}
35
36	pub fn name(&self) -> &str {
37		&self.name
38	}
39}
40
41// SAFETY: WasmTransform only holds inert data (name + bytes).
42// A fresh Engine is created per invocation, so no shared mutable state.
43unsafe impl Send for WasmTransform {}
44unsafe impl Sync for WasmTransform {}
45
46impl Transform for WasmTransform {
47	fn apply(&self, _ctx: &TransformContext, input: Columns) -> Result<Columns> {
48		let input_bytes = marshal_columns_to_bytes(&input);
49
50		let mut engine = Engine::default();
51		engine.spawn(source::binary::bytes(&self.wasm_bytes)).map_err(|e| {
52			FFIError::Other(format!("WASM transform '{}' failed to load: {:?}", self.name, e))
53		})?;
54
55		// Allocate space in WASM linear memory
56		let alloc_result = engine.invoke("alloc", &[Value::I32(input_bytes.len() as i32)]).map_err(|e| {
57			FFIError::Other(format!("WASM transform '{}' alloc failed: {:?}", self.name, e))
58		})?;
59
60		let input_ptr = match alloc_result.first() {
61			Some(Value::I32(v)) => *v,
62			_ => {
63				return Err(FFIError::Other(format!(
64					"WASM transform '{}': alloc returned unexpected result",
65					self.name
66				))
67				.into());
68			}
69		};
70
71		// Write input data into WASM memory
72		engine.write_memory(input_ptr as usize, &input_bytes).map_err(|e| {
73			FFIError::Other(format!("WASM transform '{}' write_memory failed: {:?}", self.name, e))
74		})?;
75
76		// Call transform
77		let result = engine
78			.invoke("transform", &[Value::I32(input_ptr), Value::I32(input_bytes.len() as i32)])
79			.map_err(|e| {
80				FFIError::Other(format!(
81					"WASM transform '{}' transform call failed: {:?}",
82					self.name, e
83				))
84			})?;
85
86		let output_ptr = match result.first() {
87			Some(Value::I32(v)) => *v as usize,
88			_ => {
89				return Err(FFIError::Other(format!(
90					"WASM transform '{}': transform returned unexpected result",
91					self.name
92				))
93				.into());
94			}
95		};
96
97		// Read output length (first 4 bytes at output_ptr)
98		let len_bytes = engine.read_memory(output_ptr, 4).map_err(|e| {
99			FFIError::Other(format!("WASM transform '{}' read output length failed: {:?}", self.name, e))
100		})?;
101
102		let output_len = u32::from_le_bytes([len_bytes[0], len_bytes[1], len_bytes[2], len_bytes[3]]) as usize;
103
104		// Read full output data
105		let output_bytes = engine.read_memory(output_ptr + 4, output_len).map_err(|e| {
106			FFIError::Other(format!("WASM transform '{}' read output data failed: {:?}", self.name, e))
107		})?;
108
109		Ok(unmarshal_columns_from_bytes(&output_bytes))
110	}
111}