Skip to main content

reifydb_function/
wasm.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! WASM scalar function implementation that executes WebAssembly modules as scalar functions
5
6use reifydb_core::value::column::data::ColumnData;
7use reifydb_sdk::marshal::wasm::{marshal_columns_to_bytes, unmarshal_columns_from_bytes};
8use reifydb_type::{fragment::Fragment, value::r#type::Type};
9use reifydb_wasm::{Engine, SpawnBinary, module::value::Value, source};
10
11use super::{ScalarFunction, ScalarFunctionContext};
12use crate::error::{ScalarFunctionError, ScalarFunctionResult};
13
14/// WASM scalar function that loads and executes a `.wasm` module.
15///
16/// Each WASM module must export:
17/// - `alloc(size: i32) -> i32` — allocate `size` bytes, return pointer
18/// - `dealloc(ptr: i32, size: i32)` — free memory
19/// - `scalar(input_ptr: i32, input_len: i32) -> i32` — pointer to output (first 4 bytes at output pointer = output
20///   length as LE u32)
21///
22/// Input: the context's `columns` marshalled as flat binary.
23/// Output: flat binary representing a single-column `Columns`, from which
24///   the first column's `ColumnData` is extracted.
25pub struct WasmScalarFunction {
26	name: String,
27	wasm_bytes: Vec<u8>,
28}
29
30impl WasmScalarFunction {
31	pub fn new(name: impl Into<String>, wasm_bytes: Vec<u8>) -> Self {
32		Self {
33			name: name.into(),
34			wasm_bytes,
35		}
36	}
37
38	pub fn name(&self) -> &str {
39		&self.name
40	}
41
42	fn err(&self, reason: impl Into<String>) -> ScalarFunctionError {
43		ScalarFunctionError::ExecutionFailed {
44			function: Fragment::internal(&self.name),
45			reason: reason.into(),
46		}
47	}
48}
49
50// SAFETY: WasmScalarFunction only holds inert data (name + bytes).
51// A fresh Engine is created per invocation, so no shared mutable state.
52unsafe impl Send for WasmScalarFunction {}
53unsafe impl Sync for WasmScalarFunction {}
54
55impl ScalarFunction for WasmScalarFunction {
56	fn return_type(&self, _input_types: &[Type]) -> Type {
57		Type::Any
58	}
59
60	fn scalar<'a>(&'a self, ctx: ScalarFunctionContext<'a>) -> ScalarFunctionResult<ColumnData> {
61		let input_bytes = marshal_columns_to_bytes(ctx.columns);
62
63		let mut engine = Engine::default();
64		engine.spawn(source::binary::bytes(&self.wasm_bytes))
65			.map_err(|e| self.err(format!("failed to load: {:?}", e)))?;
66
67		// Allocate space in WASM linear memory
68		let alloc_result = engine
69			.invoke("alloc", &[Value::I32(input_bytes.len() as i32)])
70			.map_err(|e| self.err(format!("alloc failed: {:?}", e)))?;
71
72		let input_ptr = match alloc_result.first() {
73			Some(Value::I32(v)) => *v,
74			_ => return Err(self.err("alloc returned unexpected result")),
75		};
76
77		// Write input data
78		engine.write_memory(input_ptr as usize, &input_bytes)
79			.map_err(|e| self.err(format!("write_memory failed: {:?}", e)))?;
80
81		// Call scalar
82		let result = engine
83			.invoke("scalar", &[Value::I32(input_ptr), Value::I32(input_bytes.len() as i32)])
84			.map_err(|e| self.err(format!("scalar call failed: {:?}", e)))?;
85
86		let output_ptr = match result.first() {
87			Some(Value::I32(v)) => *v as usize,
88			_ => return Err(self.err("scalar returned unexpected result")),
89		};
90
91		// Read output length (first 4 bytes at output_ptr)
92		let len_bytes = engine
93			.read_memory(output_ptr, 4)
94			.map_err(|e| self.err(format!("read output length failed: {:?}", e)))?;
95
96		let output_len = u32::from_le_bytes([len_bytes[0], len_bytes[1], len_bytes[2], len_bytes[3]]) as usize;
97
98		// Read full output data
99		let output_bytes = engine
100			.read_memory(output_ptr + 4, output_len)
101			.map_err(|e| self.err(format!("read output data failed: {:?}", e)))?;
102
103		// Unmarshal as Columns and extract the first column's data
104		let output_columns = unmarshal_columns_from_bytes(&output_bytes);
105
106		match output_columns.first() {
107			Some(col) => Ok(col.data().clone()),
108			None => Ok(ColumnData::none_typed(Type::Any, ctx.row_count)),
109		}
110	}
111}