flowrlib/
wasm.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
use std::cmp::max;
use std::sync::{Arc, Mutex};

use log::info;
use log::trace;
use serde_json::Value;
use url::Url;
use wasmtime::{Func, Instance, Memory, Module, Store, Val};

use flowcore::{Implementation, RunAgain};
use flowcore::errors::*;
use flowcore::provider::Provider;

const DEFAULT_WASM_FILENAME: &str = "module";

const MAX_RESULT_SIZE: i32 = 1024;

#[derive(Debug)]
pub struct WasmExecutor {
    store: Arc<Mutex<Store<()>>>,
    memory: Memory,
    implementation: Func,
    alloc: Func,
    source_url: Url,
}

impl WasmExecutor {
    // Serialize the inputs into JSON and then write them into the linear memory for WASM to read
    // Return the offset of the data in linear memory and the data size in bytes
    fn send_inputs(&self, store: &mut Store<()>, inputs: &[Value]) -> Result<(i32, i32)> {
        let input_data = serde_json::to_vec(&inputs)?;
        let alloc_size = max(input_data.len() as i32, MAX_RESULT_SIZE);
        let offset = self.alloc(alloc_size, store)?;
        self.memory
            .write(store, offset as usize, &input_data)
            .map_err(|_| "Could not write to WASM Linear Memory")?;
        Ok((offset, input_data.len() as i32))
    }

    // Call the "alloc" wasm function
    // - `length` is the length of block of memory to allocate
    // - returns the offset to the allocated memory
    fn alloc(&self, length: i32, store: &mut Store<()>) -> Result<i32> {
        let mut results: [Val;1] = [Val::I32(0)];
        let params = [Val::I32(length)];
        self.alloc.call(store, &params, &mut results)
            .map_err(|_| "WASM alloc() call failed")?;

        match results[0] {
            Val::I32(offset) => Ok(offset),
            _ => bail!("WASM alloc() failed"),
        }
    }

    // Call the "implementation" wasm function
    // - `offset` is the offset to the input values (json), and the length of the json
    // - `length` is the length of the input json
    // - returns the length of the resulting json, at the same offset
    fn call(&self, offset: i32, length: i32, store: &mut Store<()>) -> Result<i32> {
        let mut results: [Val;1] = [Val::I32(0)];
        let params = [Val::I32(offset), Val::I32(length)];
        self.implementation
            .call(store, &params, &mut results)
            .map_err(|_| format!("Error returned by WASM implementation.call() for {:?}",
            self.source_url))?;

        match results[0] {
            Val::I32(result_length) => {
                trace!("Return length from wasm function of {}", result_length);
                if result_length > MAX_RESULT_SIZE {
                    bail!(
                    "Return length from wasm function of {} exceed maximum allowed",
                    result_length
                    );
                }
                Ok(result_length)
            },
            _ => bail!(format!("Unexpected value returned by WASM Func.call()()"))
        }
    }

    fn get_result(
        &self,
        result_length: i32,
        offset: usize,
        store: &mut Store<()>,
    ) -> Result<(Option<Value>, RunAgain)> {
        let mut buffer: Vec<u8> = vec![0u8; result_length as usize];
        self
            .memory
            .read(store, offset, &mut buffer)
            .map_err(|_| "could not read return value from WASM linear memory")?;

        let result_returned = serde_json::from_slice(buffer.as_slice())
            .chain_err(|| "Could not convert returned data from wasm to json")?;
        trace!("WASM run() function invocation Result = {:?}", result_returned);
        result_returned
    }
}


impl Implementation for WasmExecutor {
    fn run(&self, inputs: &[Value]) -> Result<(Option<Value>, RunAgain)> {
        let mut store = self.store.lock().map_err(|_| "Could not lock WASM store")?;
        let (offset, length) = self.send_inputs(&mut store, inputs)?;
        let result_length = self.call(offset, length, &mut store)?;
        self.get_result(result_length, offset as usize, &mut store)
    }
}

unsafe impl Send for WasmExecutor {}

unsafe impl Sync for WasmExecutor {}

/// load a Wasm module from the specified Url and return it wrapped in a WasmExecutor `Implementation`
pub fn load(provider: &dyn Provider, source_url: &Url) -> Result<WasmExecutor> {
    trace!("Attempting to load WASM module from '{}'", source_url);
    let (resolved_url, _) = provider
        .resolve_url(source_url, DEFAULT_WASM_FILENAME, &["wasm"])
        .chain_err(|| format!("Could not resolve url '{source_url}' for wasm file"))?;
    let content = provider.get_contents(&resolved_url).chain_err(|| {
        format!("Could not fetch content from url '{resolved_url}' for loading wasm")
    })?;

    let mut store: Store<()> = Store::default();
    let module = Module::new(store.engine(), content)
        .map_err(|e| format!("Could not create WASM Module: {e}"))?;
    let instance = Instance::new(&mut store, &module, &[])
        .map_err(|e| format!("Could not create WASM Instance: {e}"))?;
    let memory = instance
        .get_memory(&mut store, "memory")
        .ok_or("Could not get WASM linear memory")?;
    let implementation = instance
        .get_func(&mut store, "run_wasm")
        .ok_or("Could not get the WASM instance() function")?;

    // TODO get typed function
    let alloc = instance
        .get_func(&mut store, "alloc")
        .ok_or("Could not get the WASM alloc() function")?;

    info!("Loaded wasm module from: '{source_url}'");

    Ok(WasmExecutor {
        store: Arc::new(Mutex::new(store)),
        memory,
        implementation,
        alloc,
        source_url: source_url.clone(),
    })
}

#[cfg(test)]
mod test {
    use std::path::Path;

    use serde_json::json;
    use url::Url;

    use flowcore::content::file_provider::FileProvider;
    use flowcore::Implementation;

    #[test]
    fn load_test_wasm() {
        let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/add.wasm");
        let url = Url::from_file_path(path).expect("Could not convert path to Url");
        let adder = &super::load(&FileProvider{}, &url)
            .expect("Could not load test_wasm.wasm") as &dyn Implementation;

        let inputs = vec![json!(1), json!(2)];
        let (value, run_again) = adder.run(&inputs).expect("Could not call run");

        assert_eq!(value, Some(json!(3)));
        assert!(run_again);
    }
}