use std::cmp::max;
use std::sync::{Arc, Mutex};
use log::info;
use log::trace;
use serde_json::Value;
use url::Url;
use wasmtime::*;
use flowcore::{Implementation, RunAgain};
use flowcore::errors::*;
use flowcore::provider::Provider;
const DEFAULT_WASM_FILENAME: &str = "module";
const MAX_RESULT_SIZE: i32 = 1024;
#[derive(Debug)]
pub struct WasmExecutor {
store: Arc<Mutex<Store<()>>>,
memory: Memory,
implementation: Func,
alloc: Func,
source_url: Url,
}
impl WasmExecutor {
fn send_inputs(&self, store: &mut Store<()>, inputs: &[Value]) -> Result<(i32, i32)> {
let input_data = serde_json::to_vec(&inputs)?;
let alloc_size = max(input_data.len() as i32, MAX_RESULT_SIZE);
let offset = self.alloc(alloc_size, store)?;
self.memory
.write(store, offset as usize, &input_data)
.map_err(|_| "Could not write to WASM Linear Memory")?;
Ok((offset, input_data.len() as i32))
}
fn alloc(&self, length: i32, store: &mut Store<()>) -> Result<i32> {
let mut results: [Val;1] = [Val::I32(0)];
let params = [Val::I32(length)];
self.alloc.call(store, ¶ms, &mut results)
.map_err(|_| "WASM alloc() call failed")?;
match results[0] {
Val::I32(offset) => Ok(offset),
_ => bail!("WASM alloc() failed"),
}
}
fn call(&self, offset: i32, length: i32, store: &mut Store<()>) -> Result<i32> {
let mut results: [Val;1] = [Val::I32(0)];
let params = [Val::I32(offset), Val::I32(length)];
self.implementation
.call(store, ¶ms, &mut results)
.map_err(|_| format!("Error returned by WASM implementation.call() for {:?}",
self.source_url))?;
match results[0] {
Val::I32(result_length) => {
trace!("Return length from wasm function of {}", result_length);
if result_length > MAX_RESULT_SIZE {
bail!(
"Return length from wasm function of {} exceed maximum allowed",
result_length
);
}
Ok(result_length)
},
_ => bail!(format!("Unexpected value returned by WASM Func.call()()"))
}
}
fn get_result(
&self,
result_length: i32,
offset: usize,
store: &mut Store<()>,
) -> Result<(Option<Value>, RunAgain)> {
let mut buffer: Vec<u8> = vec![0u8; result_length as usize];
self
.memory
.read(store, offset, &mut buffer)
.map_err(|_| "could not read return value from WASM linear memory")?;
let result_returned = serde_json::from_slice(buffer.as_slice())
.chain_err(|| "Could not convert returned data from wasm to json")?;
trace!("WASM run() function invocation Result = {:?}", result_returned);
result_returned
}
}
impl Implementation for WasmExecutor {
fn run(&self, inputs: &[Value]) -> Result<(Option<Value>, RunAgain)> {
let mut store = self.store.lock().map_err(|_| "Could not lock WASM store")?;
let (offset, length) = self.send_inputs(&mut store, inputs)?;
let result_length = self.call(offset, length, &mut store)?;
self.get_result(result_length, offset as usize, &mut store)
}
}
unsafe impl Send for WasmExecutor {}
unsafe impl Sync for WasmExecutor {}
pub fn load(provider: &dyn Provider, source_url: &Url) -> Result<WasmExecutor> {
trace!("Attempting to load WASM module from '{}'", source_url);
let (resolved_url, _) = provider
.resolve_url(source_url, DEFAULT_WASM_FILENAME, &["wasm"])
.chain_err(|| format!("Could not resolve url '{source_url}' for wasm file"))?;
let content = provider.get_contents(&resolved_url).chain_err(|| {
format!("Could not fetch content from url '{resolved_url}' for loading wasm")
})?;
let mut store: Store<()> = Store::default();
let module = Module::new(store.engine(), content)
.map_err(|e| format!("Could not create WASM Module: {e}"))?;
let instance = Instance::new(&mut store, &module, &[])
.map_err(|e| format!("Could not create WASM Instance: {e}"))?;
let memory = instance
.get_memory(&mut store, "memory")
.ok_or("Could not get WASM linear memory")?;
let implementation = instance
.get_func(&mut store, "run_wasm")
.ok_or("Could not get the WASM instance() function")?;
let alloc = instance
.get_func(&mut store, "alloc")
.ok_or("Could not get the WASM alloc() function")?;
info!("Loaded wasm module from: '{source_url}'");
Ok(WasmExecutor {
store: Arc::new(Mutex::new(store)),
memory,
implementation,
alloc,
source_url: source_url.clone(),
})
}
#[cfg(test)]
mod test {
use std::path::Path;
use url::Url;
use flowcore::content::file_provider::FileProvider;
use serde_json::json;
use flowcore::Implementation;
#[test]
fn load_test_wasm() {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/add.wasm");
let url = Url::from_file_path(path).expect("Could not convert path to Url");
let adder = &super::load(&FileProvider{}, &url)
.expect("Coudl not load test_wasm.wasm") as &dyn Implementation;
let inputs = vec![json!(1), json!(2)];
let (value, run_again) = adder.run(&inputs).expect("Could not call run");
assert_eq!(value, Some(json!(3)));
assert!(run_again);
}
}