marine/
marine.rs

1/*
2 * Copyright 2020 Fluence Labs Limited
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17use crate::config::MarineConfig;
18use crate::marine_interface::MarineInterface;
19use crate::MarineError;
20use crate::MarineResult;
21use crate::IValue;
22use crate::IType;
23use crate::MemoryStats;
24use crate::module_loading::load_modules_from_fs;
25use crate::host_imports::logger::LoggerFilter;
26use crate::host_imports::logger::WASM_LOG_ENV_NAME;
27use crate::host_imports::call_parameters_v3_to_v0;
28use crate::host_imports::call_parameters_v3_to_v1;
29use crate::host_imports::call_parameters_v3_to_v2;
30use crate::json_to_marine_err;
31
32use marine_wasm_backend_traits::WasmBackend;
33#[cfg(feature = "raw-module-api")]
34use marine_wasm_backend_traits::WasiState;
35
36use marine_core::MError;
37use marine_core::generic::MarineCore;
38use marine_core::IFunctionArg;
39use marine_core::MarineCoreConfig;
40use marine_core::MRecordTypes;
41use marine_utils::SharedString;
42use marine_rs_sdk::CallParameters;
43
44use parking_lot::Mutex;
45use serde_json::Value as JValue;
46
47use std::convert::TryInto;
48use std::collections::HashMap;
49use std::path::PathBuf;
50use std::sync::Arc;
51
52type MFunctionSignature = (Arc<Vec<IFunctionArg>>, Arc<Vec<IType>>);
53type MModuleInterface = (Arc<Vec<IFunctionArg>>, Arc<Vec<IType>>, Arc<MRecordTypes>);
54
55struct ModuleInterface {
56    function_signatures: HashMap<SharedString, MFunctionSignature>,
57    record_types: Arc<MRecordTypes>,
58}
59
60pub struct Marine<WB: WasmBackend> {
61    /// Marine instance.
62    core: MarineCore<WB>,
63
64    /// Parameters of call accessible by Wasm modules.
65    call_parameters_v0: Arc<Mutex<marine_call_parameters_v0::CallParameters>>,
66
67    call_parameters_v1: Arc<Mutex<marine_call_parameters_v1::CallParameters>>,
68
69    call_parameters_v2: Arc<Mutex<marine_call_parameters_v2::CallParameters>>,
70
71    /// Parameters of call accessible by Wasm modules.
72    call_parameters_v3: Arc<Mutex<CallParameters>>,
73
74    /// Cached module interfaces by names.
75    module_interfaces_cache: HashMap<String, ModuleInterface>,
76}
77
78impl<WB: WasmBackend> Marine<WB> {
79    /// Creates Marine from config deserialized from TOML.
80    pub async fn with_raw_config<C>(backend: WB, config: C) -> MarineResult<Self>
81    where
82        C: TryInto<MarineConfig<WB>>,
83        MarineError: From<C::Error>,
84    {
85        let config = config.try_into()?;
86        let modules = config
87            .modules_config
88            .iter()
89            .map(|m| -> MarineResult<(String, PathBuf)> {
90                Ok((m.import_name.clone(), m.get_path(&config.modules_dir)?))
91            })
92            .collect::<MarineResult<HashMap<String, PathBuf>>>()?;
93
94        Self::with_module_names::<MarineConfig<WB>>(backend, &modules, config).await
95    }
96
97    /// Creates Marine with given modules.
98    pub async fn with_modules<C>(
99        backend: WB,
100        mut modules: HashMap<String, Vec<u8>>,
101        config: C,
102    ) -> MarineResult<Self>
103    where
104        C: TryInto<MarineConfig<WB>>,
105        MarineError: From<C::Error>,
106    {
107        let config = config.try_into()?;
108        let core_config = MarineCoreConfig::new(backend, config.total_memory_limit);
109        let mut marine = MarineCore::new(core_config)?;
110        let call_parameters_v0 = Arc::<Mutex<marine_call_parameters_v0::CallParameters>>::default();
111        let call_parameters_v1 = Arc::<Mutex<marine_call_parameters_v1::CallParameters>>::default();
112        let call_parameters_v2 = Arc::<Mutex<marine_call_parameters_v2::CallParameters>>::default();
113        let call_parameters_v3 = Arc::<Mutex<CallParameters>>::default();
114
115        let modules_dir = config.modules_dir;
116
117        // LoggerFilter can be initialized with an empty string
118        let wasm_log_env = std::env::var(WASM_LOG_ENV_NAME).unwrap_or_default();
119        let logger_filter = LoggerFilter::from_env_string(&wasm_log_env);
120
121        for module in config.modules_config {
122            let module_bytes = modules.remove(&module.import_name).ok_or_else(|| {
123                MarineError::InstantiationError {
124                    module_import_name: module.import_name.clone(),
125                    modules_dir: modules_dir.clone(),
126                    provided_modules: modules.keys().cloned().collect::<Vec<_>>(),
127                }
128            })?;
129
130            let marine_module_config = crate::config::make_marine_config(
131                module.import_name.clone(),
132                Some(module.config),
133                call_parameters_v0.clone(),
134                call_parameters_v1.clone(),
135                call_parameters_v2.clone(),
136                call_parameters_v3.clone(),
137                &logger_filter,
138            )?;
139
140            marine
141                .load_module(module.import_name, &module_bytes, marine_module_config)
142                .await
143                .map_err(|e| check_for_oom_and_convert_error(&marine, e))?;
144        }
145
146        Ok(Self {
147            core: marine,
148            call_parameters_v0,
149            call_parameters_v1,
150            call_parameters_v2,
151            call_parameters_v3,
152            module_interfaces_cache: HashMap::new(),
153        })
154    }
155
156    /// Searches for modules in `config.modules_dir`, loads only those in the `names` set
157    pub async fn with_module_names<C>(
158        backend: WB,
159        names: &HashMap<String, PathBuf>,
160        config: C,
161    ) -> MarineResult<Self>
162    where
163        C: TryInto<MarineConfig<WB>>,
164        MarineError: From<C::Error>,
165    {
166        let config = config.try_into()?;
167        let modules = load_modules_from_fs(names)?;
168
169        Self::with_modules::<MarineConfig<WB>>(backend, modules, config).await
170    }
171
172    /// Call a specified function of loaded on a startup module by its name.
173    pub async fn call_with_ivalues_async(
174        &mut self,
175        module_name: impl AsRef<str>,
176        func_name: impl AsRef<str>,
177        args: &[IValue],
178        call_parameters: marine_rs_sdk::CallParameters,
179    ) -> MarineResult<Vec<IValue>> {
180        self.update_call_parameters(call_parameters);
181
182        let result = self
183            .core
184            .call_async(module_name, func_name, args)
185            .await
186            .map_err(|e| check_for_oom_and_convert_error(&self.core, e))?;
187
188        self.core.clear_allocation_stats();
189
190        Ok(result)
191    }
192
193    /// Call a specified function of loaded on a startup module by its name.
194    pub async fn call_with_json_async(
195        &mut self,
196        module_name: impl AsRef<str>,
197        func_name: impl AsRef<str>,
198        json_args: JValue,
199        call_parameters: marine_rs_sdk::CallParameters,
200    ) -> MarineResult<JValue> {
201        use it_json_serde::json_to_ivalues;
202        use it_json_serde::ivalues_to_json;
203
204        let module_name = module_name.as_ref();
205        let func_name = func_name.as_ref();
206
207        let (func_signature, output_types, record_types) =
208            self.lookup_module_interface(module_name, func_name)?;
209        let iargs = json_to_marine_err!(
210            json_to_ivalues(
211                json_args,
212                func_signature.iter().map(|arg| (&arg.name, &arg.ty)),
213                &record_types,
214            ),
215            module_name.to_string(),
216            func_name.to_string()
217        )?;
218
219        self.update_call_parameters(call_parameters);
220
221        let result = self
222            .core
223            .call_async(module_name, func_name, &iargs)
224            .await
225            .map_err(|e| check_for_oom_and_convert_error(&self.core, e))?;
226
227        self.core.clear_allocation_stats();
228
229        json_to_marine_err!(
230            ivalues_to_json(result, &output_types, &record_types),
231            module_name.to_string(),
232            func_name.to_string()
233        )
234    }
235
236    /// Return all export functions (name and signatures) of loaded modules.
237    pub fn get_interface(&self) -> MarineInterface<'_> {
238        let modules = self.core.interface().collect();
239
240        MarineInterface { modules }
241    }
242
243    /// Return statistic of Wasm modules heap footprint.
244    pub fn module_memory_stats(&self) -> MemoryStats<'_> {
245        self.core.module_memory_stats()
246    }
247
248    /// At first, tries to find function signature and record types in module_interface_cache,
249    /// if there is no them, tries to look
250    fn lookup_module_interface(
251        &mut self,
252        module_name: &str,
253        func_name: &str,
254    ) -> MarineResult<MModuleInterface> {
255        use MarineError::NoSuchModule;
256        use MarineError::MissingFunctionError;
257
258        if let Some(module_interface) = self.module_interfaces_cache.get(module_name) {
259            if let Some(function) = module_interface.function_signatures.get(func_name) {
260                return Ok((
261                    function.0.clone(),
262                    function.1.clone(),
263                    module_interface.record_types.clone(),
264                ));
265            }
266
267            return Err(MissingFunctionError(func_name.to_string()));
268        }
269
270        let module_interface = self
271            .core
272            .module_interface(module_name)
273            .ok_or_else(|| NoSuchModule(module_name.to_string()))?;
274
275        let function_signatures = module_interface
276            .function_signatures
277            .iter()
278            .cloned()
279            .map(|f| (SharedString(f.name), (f.arguments, f.outputs)))
280            .collect::<HashMap<_, _>>();
281
282        let (arg_types, output_types) = function_signatures
283            .get(func_name)
284            .ok_or_else(|| MissingFunctionError(func_name.to_string()))?;
285
286        let arg_types = arg_types.clone();
287        let output_types = output_types.clone();
288        let record_types = Arc::new(module_interface.record_types.clone());
289
290        let module_interface = ModuleInterface {
291            function_signatures,
292            record_types: record_types.clone(),
293        };
294
295        self.module_interfaces_cache
296            .insert(func_name.to_string(), module_interface);
297
298        Ok((arg_types, output_types, record_types))
299    }
300
301    fn update_call_parameters(&mut self, call_parameters: CallParameters) {
302        {
303            // a separate code block to unlock the mutex ASAP and to avoid double locking
304            let mut cp = self.call_parameters_v0.lock();
305            *cp = call_parameters_v3_to_v0(call_parameters.clone());
306        }
307
308        {
309            // a separate code block to unlock the mutex ASAP and to avoid double locking
310            let mut cp = self.call_parameters_v1.lock();
311            *cp = call_parameters_v3_to_v1(call_parameters.clone());
312        }
313
314        {
315            // a separate code block to unlock the mutex ASAP and to avoid double locking
316            let mut cp = self.call_parameters_v2.lock();
317            *cp = call_parameters_v3_to_v2(call_parameters.clone());
318        }
319
320        {
321            // a separate code block to unlock the mutex ASAP and to avoid double locking
322            let mut cp = self.call_parameters_v3.lock();
323            *cp = call_parameters;
324        }
325    }
326}
327
328// This API is intended for testing purposes (mostly in Marine REPL)
329#[cfg(feature = "raw-module-api")]
330impl<WB: WasmBackend> Marine<WB> {
331    pub async fn load_module<C, S>(
332        &mut self,
333        name: S,
334        wasm_bytes: &[u8],
335        config: Option<C>,
336    ) -> MarineResult<()>
337    where
338        S: Into<String>,
339        C: TryInto<crate::generic::MarineModuleConfig<WB>>,
340        MarineError: From<C::Error>,
341    {
342        let config = config.map(|c| c.try_into()).transpose()?;
343        let name = name.into();
344
345        // LoggerFilter can be initialized with an empty string
346        let wasm_log_env = std::env::var(WASM_LOG_ENV_NAME).unwrap_or_default();
347        let logger_filter = LoggerFilter::from_env_string(&wasm_log_env);
348
349        let marine_module_config = crate::config::make_marine_config(
350            name.clone(),
351            config,
352            self.call_parameters_v0.clone(),
353            self.call_parameters_v1.clone(),
354            self.call_parameters_v2.clone(),
355            self.call_parameters_v3.clone(),
356            &logger_filter,
357        )?;
358        self.core
359            .load_module(name, wasm_bytes, marine_module_config)
360            .await
361            .map_err(|e| check_for_oom_and_convert_error(&self.core, e))
362    }
363
364    pub fn unload_module(&mut self, module_name: impl AsRef<str>) -> MarineResult<()> {
365        self.core.unload_module(module_name).map_err(Into::into)
366    }
367
368    pub fn module_wasi_state(
369        &mut self,
370        module_name: impl AsRef<str>,
371    ) -> MarineResult<Box<dyn WasiState + '_>> {
372        let module_name = module_name.as_ref();
373
374        self.core
375            .module_wasi_state(module_name)
376            .ok_or_else(|| MarineError::NoSuchModule(module_name.to_string()))
377    }
378}
379
380fn check_for_oom_and_convert_error<WB: WasmBackend>(
381    core: &MarineCore<WB>,
382    error: MError,
383) -> MarineError {
384    let allocation_stats = match core.module_memory_stats().allocation_stats {
385        Some(allocation_stats) => allocation_stats,
386        _ => return error.into(),
387    };
388
389    if allocation_stats.allocation_rejects == 0 {
390        return error.into();
391    }
392
393    match error {
394        MError::ITInstructionError(_)
395        | MError::HostImportError(_)
396        | MError::WasmBackendError(_) => MarineError::HighProbabilityOOM {
397            allocation_stats,
398            original_error: error,
399        },
400        _ => error.into(),
401    }
402}