use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleTransformErrorStatus,
};
use wasmtime::{TypedFunc, AsContextMut};
use anyhow::Result;
use crate::engine::wasmtime::{
instance::{SmartModuleInstanceContext, SmartModuleTransform},
state::WasmState,
};
type WasmFn = TypedFunc<(i32, i32, u32), i32>;
pub(crate) const FILTER_FN_NAME: &str = "filter";
pub(crate) const MAP_FN_NAME: &str = "map";
pub(crate) const FILTER_MAP_FN_NAME: &str = "filter_map";
pub(crate) const ARRAY_MAP_FN_NAME: &str = "array_map";
pub(crate) struct SimpleTansform {
f: WasmFn,
name: String,
}
impl std::fmt::Debug for SimpleTansform {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
}
impl SimpleTansform {
#[tracing::instrument(skip(ctx, store))]
pub(crate) fn try_instantiate(
name: &str,
ctx: &SmartModuleInstanceContext,
store: &mut impl AsContextMut,
) -> Result<Option<Self>> {
match ctx.get_wasm_func(store, name) {
Some(func) => {
func.typed(&mut *store)
.or_else(|_| func.typed(&mut *store))
.map(|f| {
Some(Self {
f,
name: name.to_string(),
})
})
}
None => Ok(None),
}
}
}
impl SmartModuleTransform for SimpleTansform {
fn process(
&mut self,
input: SmartModuleInput,
ctx: &mut SmartModuleInstanceContext,
store: &mut WasmState,
) -> Result<SmartModuleOutput> {
let start_time = ctx.metrics_time_start();
let slice = ctx.write_input(&input, &mut *store)?;
let map_output = self.f.call(&mut *store, slice)?;
ctx.metrics_time_elapsed(start_time, store);
if map_output < 0 {
let internal_error = SmartModuleTransformErrorStatus::try_from(map_output)
.unwrap_or(SmartModuleTransformErrorStatus::UnknownError);
return Err(internal_error.into());
}
let output: SmartModuleOutput = ctx.read_output(store)?;
let num_recs = output.successes.len() as u64;
ctx.metrics().add_records_out(num_recs);
Ok(output)
}
fn name(&self) -> &str {
&self.name
}
}