flowrlib/
wasm.rs

1use std::cmp::max;
2use std::sync::{Arc, Mutex};
3
4use log::info;
5use log::trace;
6use serde_json::Value;
7use url::Url;
8use wasmtime::*;
9
10use flowcore::{Implementation, RunAgain};
11use flowcore::errors::*;
12use flowcore::provider::Provider;
13
14const DEFAULT_WASM_FILENAME: &str = "module";
15
16const MAX_RESULT_SIZE: i32 = 1024;
17
18#[derive(Debug)]
19pub struct WasmExecutor {
20    store: Arc<Mutex<Store<()>>>,
21    memory: Memory,
22    implementation: Func,
23    alloc: Func,
24    source_url: Url,
25}
26
27impl WasmExecutor {
28    // Serialize the inputs into JSON and then write them into the linear memory for WASM to read
29    // Return the offset of the data in linear memory and the data size in bytes
30    fn send_inputs(&self, store: &mut Store<()>, inputs: &[Value]) -> Result<(i32, i32)> {
31        let input_data = serde_json::to_vec(&inputs)?;
32        let alloc_size = max(input_data.len() as i32, MAX_RESULT_SIZE);
33        let offset = self.alloc(alloc_size, store)?;
34        self.memory
35            .write(store, offset as usize, &input_data)
36            .map_err(|_| "Could not write to WASM Linear Memory")?;
37        Ok((offset, input_data.len() as i32))
38    }
39
40    // Call the "alloc" wasm function
41    // - `length` is the length of block of memory to allocate
42    // - returns the offset to the allocated memory
43    fn alloc(&self, length: i32, store: &mut Store<()>) -> Result<i32> {
44        let mut results: [Val;1] = [Val::I32(0)];
45        let params = [Val::I32(length)];
46        self.alloc.call(store, &params, &mut results)
47            .map_err(|_| "WASM alloc() call failed")?;
48
49        match results[0] {
50            Val::I32(offset) => Ok(offset),
51            _ => bail!("WASM alloc() failed"),
52        }
53    }
54
55    // Call the "implementation" wasm function
56    // - `offset` is the offset to the input values (json), and the length of the json
57    // - `length` is the length of the input json
58    // - returns the length of the resulting json, at the same offset
59    fn call(&self, offset: i32, length: i32, store: &mut Store<()>) -> Result<i32> {
60        let mut results: [Val;1] = [Val::I32(0)];
61        let params = [Val::I32(offset), Val::I32(length)];
62        self.implementation
63            .call(store, &params, &mut results)
64            .map_err(|_| format!("Error returned by WASM implementation.call() for {:?}",
65            self.source_url))?;
66
67        match results[0] {
68            Val::I32(result_length) => {
69                trace!("Return length from wasm function of {}", result_length);
70                if result_length > MAX_RESULT_SIZE {
71                    bail!(
72                    "Return length from wasm function of {} exceed maximum allowed",
73                    result_length
74                    );
75                }
76                Ok(result_length)
77            },
78            _ => bail!(format!("Unexpected value returned by WASM Func.call()()"))
79        }
80    }
81
82    fn get_result(
83        &self,
84        result_length: i32,
85        offset: usize,
86        store: &mut Store<()>,
87    ) -> Result<(Option<Value>, RunAgain)> {
88        let mut buffer: Vec<u8> = vec![0u8; result_length as usize];
89        self
90            .memory
91            .read(store, offset, &mut buffer)
92            .map_err(|_| "could not read return value from WASM linear memory")?;
93
94        let result_returned = serde_json::from_slice(buffer.as_slice())
95            .chain_err(|| "Could not convert returned data from wasm to json")?;
96        trace!("WASM run() function invocation Result = {:?}", result_returned);
97        result_returned
98    }
99}
100
101
102impl Implementation for WasmExecutor {
103    fn run(&self, inputs: &[Value]) -> Result<(Option<Value>, RunAgain)> {
104        let mut store = self.store.lock().map_err(|_| "Could not lock WASM store")?;
105        let (offset, length) = self.send_inputs(&mut store, inputs)?;
106        let result_length = self.call(offset, length, &mut store)?;
107        self.get_result(result_length, offset as usize, &mut store)
108    }
109}
110
111unsafe impl Send for WasmExecutor {}
112
113unsafe impl Sync for WasmExecutor {}
114
115/// load a Wasm module from the specified Url and return it wrapped in a WasmExecutor `Implementation`
116pub fn load(provider: &dyn Provider, source_url: &Url) -> Result<WasmExecutor> {
117    trace!("Attempting to load WASM module from '{}'", source_url);
118    let (resolved_url, _) = provider
119        .resolve_url(source_url, DEFAULT_WASM_FILENAME, &["wasm"])
120        .chain_err(|| format!("Could not resolve url '{source_url}' for wasm file"))?;
121    let content = provider.get_contents(&resolved_url).chain_err(|| {
122        format!("Could not fetch content from url '{resolved_url}' for loading wasm")
123    })?;
124
125    let mut store: Store<()> = Store::default();
126    let module = Module::new(store.engine(), content)
127        .map_err(|e| format!("Could not create WASM Module: {e}"))?;
128    let instance = Instance::new(&mut store, &module, &[])
129        .map_err(|e| format!("Could not create WASM Instance: {e}"))?;
130    let memory = instance
131        .get_memory(&mut store, "memory")
132        .ok_or("Could not get WASM linear memory")?;
133    let implementation = instance
134        .get_func(&mut store, "run_wasm")
135        .ok_or("Could not get the WASM instance() function")?;
136
137    // TODO get typed function
138    let alloc = instance
139        .get_func(&mut store, "alloc")
140        .ok_or("Could not get the WASM alloc() function")?;
141
142    info!("Loaded wasm module from: '{source_url}'");
143
144    Ok(WasmExecutor {
145        store: Arc::new(Mutex::new(store)),
146        memory,
147        implementation,
148        alloc,
149        source_url: source_url.clone(),
150    })
151}
152
153#[cfg(test)]
154mod test {
155    use std::path::Path;
156    use url::Url;
157    use flowcore::content::file_provider::FileProvider;
158    use serde_json::json;
159    use flowcore::Implementation;
160
161    #[test]
162    fn load_test_wasm() {
163        let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/add.wasm");
164        let url = Url::from_file_path(path).expect("Could not convert path to Url");
165        let adder = &super::load(&FileProvider{}, &url)
166            .expect("Coudl not load test_wasm.wasm") as &dyn Implementation;
167
168        let inputs = vec![json!(1), json!(2)];
169        let (value, run_again) = adder.run(&inputs).expect("Could not call run");
170
171        assert_eq!(value, Some(json!(3)));
172        assert!(run_again);
173    }
174}