use std::cmp::max;
use std::sync::{Arc, Mutex};
use log::info;
use log::trace;
use serde_json::Value;
use url::Url;
use wasmtime::{Config, Engine, Func, Instance, Memory, Module, Store, Val};
use flowcore::errors::{bail, Result, ResultExt};
use flowcore::provider::Provider;
use flowcore::{Implementation, RunAgain};
const DEFAULT_WASM_FILENAME: &str = "module";
const MAX_RESULT_SIZE: i32 = 256 * 1024;
#[derive(Debug)]
pub struct Executor {
store: Arc<Mutex<Store<()>>>,
memory: Memory,
implementation: Func,
alloc: Func,
source_url: Url,
}
impl Executor {
fn send_inputs(&self, store: &mut Store<()>, inputs: &[Value]) -> Result<(i32, i32)> {
let input_data = serde_json::to_vec(&inputs)?;
let alloc_size = max(i32::try_from(input_data.len())?, MAX_RESULT_SIZE);
let offset = self.alloc(alloc_size, store)?;
self.memory
.write(store, usize::try_from(offset)?, &input_data)
.map_err(|_| "Could not write to WASM Linear Memory")?;
let data_size = i32::try_from(input_data.len())?;
Ok((offset, data_size))
}
fn alloc(&self, length: i32, store: &mut Store<()>) -> Result<i32> {
let mut results: [Val; 1] = [Val::I32(0)];
let params = [Val::I32(length)];
self.alloc
.call(store, ¶ms, &mut results)
.map_err(|_| "WASM alloc() call failed")?;
match results[0] {
Val::I32(offset) => Ok(offset),
_ => bail!("WASM alloc() failed"),
}
}
fn call(&self, offset: i32, length: i32, store: &mut Store<()>) -> Result<i32> {
let mut results: [Val; 1] = [Val::I32(0)];
let params = [Val::I32(offset), Val::I32(length)];
self.implementation
.call(store, ¶ms, &mut results)
.map_err(|e| {
format!(
"Error returned by WASM implementation.call() for {:?} => '{}'",
self.source_url, e
)
})?;
match results[0] {
Val::I32(result_length) => {
trace!("Return length from wasm function of {result_length}");
if result_length > MAX_RESULT_SIZE {
bail!(
"Return length from wasm function of {} exceeds maximum allowed",
result_length
);
}
Ok(result_length)
}
_ => bail!("Unexpected value returned by WASM Func.call()()"),
}
}
fn get_result(
&self,
result_length: i32,
offset: usize,
store: &mut Store<()>,
) -> Result<(Option<Value>, RunAgain)> {
assert!(result_length >= 0, "result_length was negative");
#[allow(clippy::cast_sign_loss)]
let mut buffer: Vec<u8> = vec![0u8; result_length as usize];
self.memory
.read(store, offset, &mut buffer)
.map_err(|_| "could not read return value from WASM linear memory")?;
let result_returned = serde_json::from_slice(buffer.as_slice())
.chain_err(|| "Could not convert returned data from wasm to json")?;
trace!("WASM run() function invocation Result = {result_returned:?}");
result_returned
}
}
impl Implementation for Executor {
fn run(&self, inputs: &[Value]) -> Result<(Option<Value>, RunAgain)> {
let mut store = self.store.lock().map_err(|_| "Could not lock WASM store")?;
let (offset, length) = self.send_inputs(&mut store, inputs)?;
let result_length = self.call(offset, length, &mut store)?;
assert!(offset >= 0, "offset was negative");
#[allow(clippy::cast_sign_loss)]
self.get_result(result_length, offset as usize, &mut store)
}
}
pub fn load(provider: &Arc<dyn Provider>, source_url: &Url) -> Result<Executor> {
trace!("Attempting to load WASM module from '{source_url}'");
let (resolved_url, _) = provider
.resolve_url(source_url, DEFAULT_WASM_FILENAME, &["wasm"])
.chain_err(|| format!("Could not resolve url '{source_url}' for wasm file"))?;
let content = provider.get_contents(&resolved_url).chain_err(|| {
format!("Could not fetch content from url '{resolved_url}' for loading wasm")
})?;
let mut config = Config::new();
config.max_wasm_stack(2 * 1024 * 1024);
let engine = Engine::new(&config).map_err(|e| format!("Could not create WASM Engine: {e}"))?;
let mut store: Store<()> = Store::new(&engine, ());
let module = Module::from_binary(&engine, &content)
.map_err(|e| format!("Could not create WASM Module: {e}"))?;
let instance = Instance::new(&mut store, &module, &[])
.map_err(|e| format!("Could not create WASM Instance: {e}"))?;
let memory = instance
.get_memory(&mut store, "memory")
.ok_or("Could not get WASM linear memory")?;
let implementation = instance
.get_func(&mut store, "run_wasm")
.ok_or("Could not get the WASM instance() function")?;
let alloc = instance
.get_func(&mut store, "alloc")
.ok_or("Could not get the WASM alloc() function")?;
info!("Loaded wasm module from: '{source_url}'");
Ok(Executor {
store: Arc::new(Mutex::new(store)),
memory,
implementation,
alloc,
source_url: source_url.clone(),
})
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod test {
use std::path::Path;
use std::sync::Arc;
use serde_json::json;
use url::Url;
use flowcore::content::file_provider::FileProvider;
use flowcore::provider::Provider;
use flowcore::Implementation;
#[test]
fn load_test_wasm() {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/add.wasm");
let url = Url::from_file_path(path).expect("Could not convert path to Url");
let provider = Arc::new(FileProvider {}) as Arc<dyn Provider>;
let adder = &super::load(&provider, &url).expect("Could not load test_wasm.wasm")
as &dyn Implementation;
let inputs = vec![json!(1), json!(2)];
let (value, run_again) = adder.run(&inputs).expect("Could not call run");
assert_eq!(value, Some(json!(3)));
assert!(run_again);
}
}