use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns};
use reifydb_routine::routine::{
Function, FunctionKind, Routine, RoutineInfo, context::FunctionContext, error::RoutineError,
};
use reifydb_sdk::marshal::wasm::{marshal_columns_to_bytes, unmarshal_columns_from_bytes};
use reifydb_type::{fragment::Fragment, value::r#type::Type};
use crate::loader::wasm::invoke_wasm_module;
pub struct WasmScalarFunction {
info: RoutineInfo,
wasm_bytes: Vec<u8>,
}
impl WasmScalarFunction {
pub fn new(name: impl Into<String>, wasm_bytes: Vec<u8>) -> Self {
let name = name.into();
Self {
info: RoutineInfo::new(&name),
wasm_bytes,
}
}
pub fn name(&self) -> &str {
&self.info.name
}
fn err(&self, reason: impl Into<String>) -> RoutineError {
RoutineError::FunctionExecutionFailed {
function: Fragment::internal(&self.info.name),
reason: reason.into(),
}
}
}
unsafe impl Send for WasmScalarFunction {}
unsafe impl Sync for WasmScalarFunction {}
impl<'a> Routine<FunctionContext<'a>> for WasmScalarFunction {
fn info(&self) -> &RoutineInfo {
&self.info
}
fn return_type(&self, _input_types: &[Type]) -> Type {
Type::Any
}
fn execute(&self, ctx: &mut FunctionContext<'a>, args: &Columns) -> Result<Columns, RoutineError> {
let input_bytes = marshal_columns_to_bytes(args);
let label = format!("WASM scalar function '{}'", self.info.name);
let output_bytes = invoke_wasm_module(&self.wasm_bytes, "scalar", &input_bytes, &label)
.map_err(|e| self.err(e.to_string()))?;
let output_columns = unmarshal_columns_from_bytes(&output_bytes);
match output_columns.first() {
Some(col) => {
let data = col.data().clone();
Ok(Columns::new(vec![ColumnWithName::new(ctx.fragment.clone(), data)]))
}
None => {
let data = ColumnBuffer::none_typed(Type::Any, args.row_count());
Ok(Columns::new(vec![ColumnWithName::new(ctx.fragment.clone(), data)]))
}
}
}
}
impl Function for WasmScalarFunction {
fn kinds(&self) -> &[FunctionKind] {
&[FunctionKind::Scalar]
}
}