use flowcore::{Implementation, RunAgain};
use flowcore::errors::*;
use flowcore::meta_provider::Provider;
use log::{info, trace};
use serde_json::Value;
use std::cmp::max;
use std::sync::Arc;
use std::sync::Mutex;
use url::Url;
use wasmi::{
ExternVal, ImportsBuilder, MemoryRef, Module, ModuleInstance, ModuleRef, NopExternals,
RuntimeValue, Signature, ValueType,
};
const DEFAULT_WASM_FILENAME: &str = "module";
const MAX_RESULT_SIZE: i32 = 1024;
#[derive(Debug)]
pub struct WasmExecutor {
module: Arc<Mutex<ModuleRef>>,
memory: Arc<Mutex<MemoryRef>>,
source_url: Url,
}
impl WasmExecutor {
fn new(module_ref: ModuleRef, memory: MemoryRef, source_url: &Url) -> Self {
WasmExecutor {
module: Arc::new(Mutex::new(module_ref)),
memory: Arc::new(Mutex::new(memory)),
source_url: source_url.clone(),
}
}
}
unsafe impl Send for WasmExecutor {}
unsafe impl Sync for WasmExecutor {}
fn send_byte_array(instance: &ModuleRef, memory: &MemoryRef, bytes: &[u8]) -> u32 {
let alloc_size = max(bytes.len() as i32, MAX_RESULT_SIZE);
let result =
instance.invoke_export("alloc", &[RuntimeValue::I32(alloc_size)], &mut NopExternals);
match result {
Ok(Some(RuntimeValue::I32(pointer))) => match memory.set(pointer as u32, bytes) {
Ok(_) => pointer as u32,
_ => 0_u32,
},
_ => 0_u32,
}
}
impl Implementation for WasmExecutor {
fn run(&self, inputs: &[Value]) -> Result<(Option<Value>, RunAgain)> {
if let (Ok(module_ref), Ok(memory_ref)) = (self.module.lock(), self.memory.lock()) {
if let Ok(input_data) = serde_json::to_vec(&inputs) {
let input_data_wasm_ptr = send_byte_array(&module_ref, &memory_ref, &input_data);
let result = module_ref.invoke_export(
"run_wasm",
&[
RuntimeValue::I32(input_data_wasm_ptr as i32),
RuntimeValue::I32(input_data.len() as i32),
],
&mut NopExternals,
);
return match result {
Ok(Some(value)) => match value {
RuntimeValue::I32(result_length) => {
trace!("Return length from wasm function of {}", result_length);
if result_length > MAX_RESULT_SIZE {
bail!(
"Return length from wasm function of {} exceed maximum allowed",
result_length
);
}
let result_data = memory_ref.get(input_data_wasm_ptr, result_length as usize)
.chain_err(|| "Could not get wasm memory reference")?;
let result = serde_json::from_slice(result_data.as_slice())
.chain_err(|| "Could not convert returned data to json")?;
trace!("WASM run() function invocation Result = {:?}", result);
result
}
_ => {
bail!("Unexpected return value from wasm function on invoke_export()");
}
},
Ok(None) => {
bail!(format!("None value returned by Wasm invoke_export(): {:?}\nInputs:\n{:?}",
self.source_url, inputs));
}
Err(err) => {
bail!("Error returned by Wasm invoke_export() on '{}': {:?}\nInputs:\n{:?}",
self.source_url, err, inputs);
}
};
}
}
Ok((None, true))
}
}
pub fn load(provider: &dyn Provider, source_url: &Url) -> Result<WasmExecutor> {
let (resolved_url, _) = provider
.resolve_url(source_url, DEFAULT_WASM_FILENAME, &["wasm"])
.chain_err(|| "Could not resolve url for manifest while attempting to load manifest")?;
let content = provider.get_contents(&resolved_url).chain_err(|| {
format!(
"Could not fetch content from url '{}' for loading wasm",
resolved_url
)
})?;
let module = Module::from_buffer(content).chain_err(|| {
format!(
"Could not create Wasm Module from content in '{}'",
resolved_url
)
})?;
let module_ref = ModuleInstance::new(&module, &ImportsBuilder::default())
.chain_err(|| "Could not create new ModuleInstance when loading WASM content")?
.assert_no_start();
let memory = module_ref
.export_by_name("memory")
.chain_err(|| "`memory` export not found")?
.as_memory()
.chain_err(|| "export name `memory` is not of memory type")?
.to_owned();
check_required_functions(&module_ref, &resolved_url)?;
info!("Loaded wasm module from: '{}'", source_url);
Ok(WasmExecutor::new(module_ref, memory, source_url))
}
fn check_required_functions(module_ref: &ModuleRef, filename: &Url) -> Result<()> {
let required_wasm_functions = vec![
(
"alloc",
Signature::new(&[ValueType::I32][..], Some(ValueType::I32)),
),
(
"run_wasm",
Signature::new(&[ValueType::I32, ValueType::I32][..], Some(ValueType::I32)),
),
];
for (function_name, signature) in required_wasm_functions {
match module_ref.export_by_name(function_name).ok_or(format!(
"No function named '{}' found in wasm file '{}'",
function_name, filename
))? {
ExternVal::Func(function_ref) => {
let sig = function_ref.signature();
if *sig != signature {
bail!(
"Expected function signature '{:?}' and found signature '{:?}'",
signature,
sig
);
}
}
_ => bail!("Exported value was not a function"),
}
}
Ok(())
}