1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#[cfg(not(target_arch = "wasm32"))]
use std::cmp::max;
#[cfg(not(target_arch = "wasm32"))]
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use std::sync::Mutex;

use flow_impl::{Implementation, RunAgain};
use serde_json::Value;
#[cfg(not(target_arch = "wasm32"))]
use wasmi::{ExternVal, ImportsBuilder, MemoryRef, Module, ModuleInstance, ModuleRef,
            NopExternals, RuntimeValue, Signature, ValueType};

use crate::errors::*;
use crate::provider::Provider;

#[cfg(not(target_arch = "wasm32"))]
const DEFAULT_WASM_FILENAME: &str = "module";

#[cfg(not(target_arch = "wasm32"))]
const MAX_RESULT_SIZE: i32 = 1024;

#[cfg(target_arch = "wasm32")]
#[derive(Debug)]
pub struct WasmExecutor;

#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub struct WasmExecutor {
    module: Arc<Mutex<ModuleRef>>,
    memory: Arc<Mutex<MemoryRef>>,
}

#[cfg(not(target_arch = "wasm32"))]
impl WasmExecutor {
    pub fn new(module_ref: ModuleRef, memory: MemoryRef) -> Self {
        WasmExecutor {
            module: Arc::new(Mutex::new(module_ref)),
            memory: Arc::new(Mutex::new(memory)),
        }
    }
}

unsafe impl Send for WasmExecutor {}
unsafe impl Sync for WasmExecutor {}

/*
    Allocate memory for array of bytes inside the wasm module and copy the array of bytes into it
*/
#[cfg(not(target_arch = "wasm32"))]
fn send_byte_array(instance: &ModuleRef, memory: &MemoryRef, bytes: &[u8]) -> u32 {
    let alloc_size = max(bytes.len() as i32, MAX_RESULT_SIZE);
    let result = instance
        .invoke_export("alloc", &[RuntimeValue::I32(alloc_size)],
                       &mut NopExternals);

    match result.unwrap().unwrap() {
        RuntimeValue::I32(pointer) => {
            memory.set(pointer as u32, bytes).unwrap();
            pointer as u32
        }
        _ => 0 as u32
    }
}

#[cfg(not(target_arch = "wasm32"))]
impl Implementation for WasmExecutor {
    fn run(&self, inputs: Vec<Vec<Value>>) -> (Option<Value>, RunAgain) {
        #[cfg(not(target_arch = "wasm32"))]
            let module_ref = self.module.lock().unwrap();
        let memory_ref = self.memory.lock().unwrap();

        // setup module memory with the serde serialization of `inputs: Vec<Vec<Value>>`
        let input_data = serde_json::to_vec(&inputs).unwrap();

        trace!("Running the exported function 'run_wasm' on input_data '{}'", String::from_utf8(input_data.clone()).unwrap());

        // Allocate a string for the input data inside wasm module
        let input_data_wasm_ptr = send_byte_array(&module_ref, &memory_ref, &input_data);

        let result = module_ref.invoke_export("run_wasm",
                                              &[RuntimeValue::I32(input_data_wasm_ptr as i32),
                                                  RuntimeValue::I32(input_data.len() as i32), ], &mut NopExternals);

        match result {
            Ok(value) => {
                match value.unwrap() {
                    RuntimeValue::I32(result_length) => {
                        trace!("Return length from wasm function of {}", result_length);
                        if result_length > MAX_RESULT_SIZE {
                            error!("Return length from wasm function of {} exceed maximum allowed", result_length);
                            (None, true)
                        } else {
                            let result_data = memory_ref.get(input_data_wasm_ptr, result_length as usize).unwrap();
                            let (result, run_again) = serde_json::from_slice(result_data.as_slice()).unwrap();
                            (result, run_again)
                        }
                    }
                    _ => {
                        error!("Unexpected return value from wasm function on invoke_export()");
                        (None, true)
                    }
                }
            }
            Err(err) => {
                error!("Error returned by Wasm invoke_export(): {:?}", err);
                (None, true)
            }
        }
    }
}

#[cfg(target_arch = "wasm32")]
impl Implementation for WasmExecutor {
    fn run(&self, _inputs: Vec<Vec<Value>>) -> (Option<Value>, RunAgain) {
        (None, false)
    }
}

/*
    load a Wasm module from the specified Url.
*/
#[cfg(not(target_arch = "wasm32"))]
pub fn load(provider: &dyn Provider, source_url: &str) -> Result<WasmExecutor> {
    let (resolved_url, _) = provider.resolve_url(&source_url, DEFAULT_WASM_FILENAME, &["wasm"])?;
    let content = provider.get_contents(&resolved_url)?;
    let module = Module::from_buffer(content)
        .chain_err(|| format!("Could not create Wasm Module from content in '{}'", resolved_url))?;

    let module_ref = ModuleInstance::new(&module, &ImportsBuilder::default())
        .chain_err(|| "Could not create new ModuleInstance when loading WASM content")?
        .assert_no_start();

    let memory = module_ref.export_by_name("memory")
        .expect("`memory` export not found")
        .as_memory()
        .expect("export name `memory` is not of memory type")
        .to_owned();

    check_required_functions(&module_ref, &resolved_url)?;

    info!("Loaded wasm module from: '{}'", source_url);

    Ok(WasmExecutor::new(module_ref, memory))
}

#[cfg(not(target_arch = "wasm32"))]
fn check_required_functions(module_ref: &ModuleRef, filename: &str) -> Result<()> {
    let required_wasm_functions = vec!(
        ("alloc", Signature::new(&[ValueType::I32][..], Some(ValueType::I32))),
        ("run_wasm", Signature::new(&[ValueType::I32, ValueType::I32][..], Some(ValueType::I32))),
    );

    for (function_name, signature) in required_wasm_functions {
        match module_ref.export_by_name(function_name).ok_or(format!("No function named '{}' found in wasm file '{}'",
                                                                     function_name, filename))? {
            ExternVal::Func(function_ref) => {
                let sig = function_ref.signature();
                if *sig != signature {
                    bail!("Expected function signature '{:?}' and found signature '{:?}'",
                            signature, sig);
                }
            }
            _ => bail!("Exported value was not a function")
        }
    }

    Ok(())
}

/*
    When the target architecture is wasm - all will be compiled to wasm
*/
#[cfg(target_arch = "wasm32")]
pub fn load(_provider: &dyn Provider, _source_url: &str) -> Result<WasmExecutor> {
    Ok(WasmExecutor {})
}