1use std::cmp::max;
2use std::sync::{Arc, Mutex};
3
4use log::info;
5use log::trace;
6use serde_json::Value;
7use url::Url;
8use wasmtime::*;
9
10use flowcore::{Implementation, RunAgain};
11use flowcore::errors::*;
12use flowcore::provider::Provider;
13
14const DEFAULT_WASM_FILENAME: &str = "module";
15
16const MAX_RESULT_SIZE: i32 = 1024;
17
18#[derive(Debug)]
19pub struct WasmExecutor {
20 store: Arc<Mutex<Store<()>>>,
21 memory: Memory,
22 implementation: Func,
23 alloc: Func,
24 source_url: Url,
25}
26
27impl WasmExecutor {
28 fn send_inputs(&self, store: &mut Store<()>, inputs: &[Value]) -> Result<(i32, i32)> {
31 let input_data = serde_json::to_vec(&inputs)?;
32 let alloc_size = max(input_data.len() as i32, MAX_RESULT_SIZE);
33 let offset = self.alloc(alloc_size, store)?;
34 self.memory
35 .write(store, offset as usize, &input_data)
36 .map_err(|_| "Could not write to WASM Linear Memory")?;
37 Ok((offset, input_data.len() as i32))
38 }
39
40 fn alloc(&self, length: i32, store: &mut Store<()>) -> Result<i32> {
44 let mut results: [Val;1] = [Val::I32(0)];
45 let params = [Val::I32(length)];
46 self.alloc.call(store, ¶ms, &mut results)
47 .map_err(|_| "WASM alloc() call failed")?;
48
49 match results[0] {
50 Val::I32(offset) => Ok(offset),
51 _ => bail!("WASM alloc() failed"),
52 }
53 }
54
55 fn call(&self, offset: i32, length: i32, store: &mut Store<()>) -> Result<i32> {
60 let mut results: [Val;1] = [Val::I32(0)];
61 let params = [Val::I32(offset), Val::I32(length)];
62 self.implementation
63 .call(store, ¶ms, &mut results)
64 .map_err(|_| format!("Error returned by WASM implementation.call() for {:?}",
65 self.source_url))?;
66
67 match results[0] {
68 Val::I32(result_length) => {
69 trace!("Return length from wasm function of {}", result_length);
70 if result_length > MAX_RESULT_SIZE {
71 bail!(
72 "Return length from wasm function of {} exceed maximum allowed",
73 result_length
74 );
75 }
76 Ok(result_length)
77 },
78 _ => bail!(format!("Unexpected value returned by WASM Func.call()()"))
79 }
80 }
81
82 fn get_result(
83 &self,
84 result_length: i32,
85 offset: usize,
86 store: &mut Store<()>,
87 ) -> Result<(Option<Value>, RunAgain)> {
88 let mut buffer: Vec<u8> = vec![0u8; result_length as usize];
89 self
90 .memory
91 .read(store, offset, &mut buffer)
92 .map_err(|_| "could not read return value from WASM linear memory")?;
93
94 let result_returned = serde_json::from_slice(buffer.as_slice())
95 .chain_err(|| "Could not convert returned data from wasm to json")?;
96 trace!("WASM run() function invocation Result = {:?}", result_returned);
97 result_returned
98 }
99}
100
101
102impl Implementation for WasmExecutor {
103 fn run(&self, inputs: &[Value]) -> Result<(Option<Value>, RunAgain)> {
104 let mut store = self.store.lock().map_err(|_| "Could not lock WASM store")?;
105 let (offset, length) = self.send_inputs(&mut store, inputs)?;
106 let result_length = self.call(offset, length, &mut store)?;
107 self.get_result(result_length, offset as usize, &mut store)
108 }
109}
110
111unsafe impl Send for WasmExecutor {}
112
113unsafe impl Sync for WasmExecutor {}
114
115pub fn load(provider: &dyn Provider, source_url: &Url) -> Result<WasmExecutor> {
117 trace!("Attempting to load WASM module from '{}'", source_url);
118 let (resolved_url, _) = provider
119 .resolve_url(source_url, DEFAULT_WASM_FILENAME, &["wasm"])
120 .chain_err(|| format!("Could not resolve url '{source_url}' for wasm file"))?;
121 let content = provider.get_contents(&resolved_url).chain_err(|| {
122 format!("Could not fetch content from url '{resolved_url}' for loading wasm")
123 })?;
124
125 let mut store: Store<()> = Store::default();
126 let module = Module::new(store.engine(), content)
127 .map_err(|e| format!("Could not create WASM Module: {e}"))?;
128 let instance = Instance::new(&mut store, &module, &[])
129 .map_err(|e| format!("Could not create WASM Instance: {e}"))?;
130 let memory = instance
131 .get_memory(&mut store, "memory")
132 .ok_or("Could not get WASM linear memory")?;
133 let implementation = instance
134 .get_func(&mut store, "run_wasm")
135 .ok_or("Could not get the WASM instance() function")?;
136
137 let alloc = instance
139 .get_func(&mut store, "alloc")
140 .ok_or("Could not get the WASM alloc() function")?;
141
142 info!("Loaded wasm module from: '{source_url}'");
143
144 Ok(WasmExecutor {
145 store: Arc::new(Mutex::new(store)),
146 memory,
147 implementation,
148 alloc,
149 source_url: source_url.clone(),
150 })
151}
152
153#[cfg(test)]
154mod test {
155 use std::path::Path;
156 use url::Url;
157 use flowcore::content::file_provider::FileProvider;
158 use serde_json::json;
159 use flowcore::Implementation;
160
161 #[test]
162 fn load_test_wasm() {
163 let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/add.wasm");
164 let url = Url::from_file_path(path).expect("Could not convert path to Url");
165 let adder = &super::load(&FileProvider{}, &url)
166 .expect("Coudl not load test_wasm.wasm") as &dyn Implementation;
167
168 let inputs = vec![json!(1), json!(2)];
169 let (value, run_again) = adder.run(&inputs).expect("Could not call run");
170
171 assert_eq!(value, Some(json!(3)));
172 assert!(run_again);
173 }
174}