reifydb_engine/transform/
wasm.rs1use reifydb_core::value::column::columns::Columns;
7use reifydb_sdk::{
8 error::FFIError,
9 marshal::wasm::{marshal_columns_to_bytes, unmarshal_columns_from_bytes},
10};
11use reifydb_type::Result;
12use reifydb_wasm::{Engine, SpawnBinary, module::value::Value, source};
13
14use super::{Transform, context::TransformContext};
15
16pub struct WasmTransform {
24 name: String,
25 wasm_bytes: Vec<u8>,
26}
27
28impl WasmTransform {
29 pub fn new(name: impl Into<String>, wasm_bytes: Vec<u8>) -> Self {
30 Self {
31 name: name.into(),
32 wasm_bytes,
33 }
34 }
35
36 pub fn name(&self) -> &str {
37 &self.name
38 }
39}
40
41unsafe impl Send for WasmTransform {}
44unsafe impl Sync for WasmTransform {}
45
46impl Transform for WasmTransform {
47 fn apply(&self, _ctx: &TransformContext, input: Columns) -> Result<Columns> {
48 let input_bytes = marshal_columns_to_bytes(&input);
49
50 let mut engine = Engine::default();
51 engine.spawn(source::binary::bytes(&self.wasm_bytes)).map_err(|e| {
52 FFIError::Other(format!("WASM transform '{}' failed to load: {:?}", self.name, e))
53 })?;
54
55 let alloc_result = engine.invoke("alloc", &[Value::I32(input_bytes.len() as i32)]).map_err(|e| {
57 FFIError::Other(format!("WASM transform '{}' alloc failed: {:?}", self.name, e))
58 })?;
59
60 let input_ptr = match alloc_result.first() {
61 Some(Value::I32(v)) => *v,
62 _ => {
63 return Err(FFIError::Other(format!(
64 "WASM transform '{}': alloc returned unexpected result",
65 self.name
66 ))
67 .into());
68 }
69 };
70
71 engine.write_memory(input_ptr as usize, &input_bytes).map_err(|e| {
73 FFIError::Other(format!("WASM transform '{}' write_memory failed: {:?}", self.name, e))
74 })?;
75
76 let result = engine
78 .invoke("transform", &[Value::I32(input_ptr), Value::I32(input_bytes.len() as i32)])
79 .map_err(|e| {
80 FFIError::Other(format!(
81 "WASM transform '{}' transform call failed: {:?}",
82 self.name, e
83 ))
84 })?;
85
86 let output_ptr = match result.first() {
87 Some(Value::I32(v)) => *v as usize,
88 _ => {
89 return Err(FFIError::Other(format!(
90 "WASM transform '{}': transform returned unexpected result",
91 self.name
92 ))
93 .into());
94 }
95 };
96
97 let len_bytes = engine.read_memory(output_ptr, 4).map_err(|e| {
99 FFIError::Other(format!("WASM transform '{}' read output length failed: {:?}", self.name, e))
100 })?;
101
102 let output_len = u32::from_le_bytes([len_bytes[0], len_bytes[1], len_bytes[2], len_bytes[3]]) as usize;
103
104 let output_bytes = engine.read_memory(output_ptr + 4, output_len).map_err(|e| {
106 FFIError::Other(format!("WASM transform '{}' read output data failed: {:?}", self.name, e))
107 })?;
108
109 Ok(unmarshal_columns_from_bytes(&output_bytes))
110 }
111}