avm_server/
runner.rs

1/*
2 * AquaVM Workflow Engine
3 *
4 * Copyright (C) 2024 Fluence DAO
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Affero General Public License as
8 * published by the Free Software Foundation version 3 of the
9 * License.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU Affero General Public License for more details.
15 *
16 * You should have received a copy of the GNU Affero General Public License
17 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20use crate::RunnerError;
21use crate::RunnerResult;
22
23use air_interpreter_interface::try_as_string;
24use air_interpreter_interface::CallResultsRepr;
25use air_interpreter_interface::InterpreterOutcome;
26use air_interpreter_sede::ToSerialized;
27use air_utils::measure;
28use avm_interface::raw_outcome::RawAVMOutcome;
29use avm_interface::CallResults;
30use fluence_keypair::KeyPair;
31use marine::generic::Marine;
32use marine::generic::MarineConfig;
33use marine::generic::ModuleDescriptor;
34use marine::IValue;
35use marine_wasm_backend_traits::WasmBackend;
36
37use std::path::PathBuf;
38
39#[derive(Clone, Copy, Debug)]
40pub struct AquaVMRuntimeLimits {
41    pub air_size_limit: u64, // WIP remove pub?
42    /// The particle data size limit.
43    pub particle_size_limit: u64,
44    /// This is the limit for the size of service call result.
45    pub call_result_size_limit: u64,
46    /// This knob controls hard RAM limits behavior for AVMRunner.
47    pub hard_limit_enabled: bool,
48}
49
50#[derive(Default)]
51pub struct AVMRuntimeLimits {
52    // The AIR script size limit.
53    pub air_size_limit: Option<u64>,
54    /// The particle data size limit.
55    pub particle_size_limit: Option<u64>,
56    /// This is the limit for the size of service call result.
57    pub call_result_size_limit: Option<u64>,
58    /// This knob controls hard RAM limits behavior for AVMRunner.
59    pub hard_limit_enabled: bool,
60}
61
62pub struct AVMRunner<WB: WasmBackend> {
63    marine: Marine<WB>,
64    /// file name of the AIR interpreter .wasm
65    wasm_filename: String,
66    /// The memory limit provided by constructor
67    total_memory_limit: Option<u64>,
68    /// This struct contains runtime RAM allowance.
69    aquavm_runtime_limits: AquaVMRuntimeLimits,
70}
71
72/// Return statistic of AVM server Wasm module heap footprint.
73pub struct AVMMemoryStats {
74    /// Size of currently used linear memory in bytes.
75    /// Please note that linear memory contains not only heap, but globals, shadow stack and so on.
76    pub memory_size: usize,
77    /// Possibly set max memory size for AVM server.
78    pub total_memory_limit: Option<u64>,
79    /// Number of allocations rejected due to memory limit.
80    /// May be not recorded by some backends in Marine.
81    pub allocation_rejects: Option<u32>,
82}
83
84impl<WB: WasmBackend> AVMRunner<WB> {
85    /// Create AVM with the provided config.
86    pub async fn new(
87        air_wasm_path: PathBuf,
88        total_memory_limit: Option<u64>,
89        avm_runtime_limits: AVMRuntimeLimits,
90        logging_mask: i32,
91        wasm_backend: WB,
92    ) -> RunnerResult<Self> {
93        let (wasm_dir, wasm_filename) = split_dirname(air_wasm_path)?;
94
95        let marine_config =
96            make_marine_config(wasm_dir, &wasm_filename, total_memory_limit, logging_mask);
97        let marine = Marine::with_raw_config(wasm_backend, marine_config).await?;
98        let aquavm_runtime_limits = avm_runtime_limits.into();
99
100        let avm = Self {
101            marine,
102            wasm_filename,
103            total_memory_limit,
104            aquavm_runtime_limits,
105        };
106
107        Ok(avm)
108    }
109
110    #[allow(clippy::too_many_arguments)]
111    #[tracing::instrument(skip_all)]
112    pub async fn call(
113        &mut self,
114        air: impl Into<String>,
115        prev_data: impl Into<Vec<u8>>,
116        data: impl Into<Vec<u8>>,
117        init_peer_id: impl Into<String>,
118        timestamp: u64,
119        ttl: u32,
120        current_peer_id: impl Into<String>,
121        call_results: CallResults,
122        keypair: &KeyPair,
123        particle_id: String,
124    ) -> RunnerResult<RawAVMOutcome> {
125        let key_format = keypair.key_format();
126        // we use secret() for compatibility with JS client that doesn't have keypair type,
127        // it can serialize a secret key only
128        let secret_key_bytes: Vec<u8> = keypair.secret().map_err(RunnerError::KeyError)?;
129
130        let args = prepare_args(
131            air,
132            prev_data,
133            data,
134            current_peer_id.into(),
135            init_peer_id.into(),
136            timestamp,
137            ttl,
138            self.aquavm_runtime_limits,
139            call_results,
140            key_format.into(),
141            secret_key_bytes,
142            particle_id,
143        );
144
145        let result = measure!(
146            self.marine
147                .call_with_ivalues_async(&self.wasm_filename, "invoke", &args, <_>::default())
148                .await?,
149            tracing::Level::INFO,
150            "marine.call_with_ivalues",
151            method = "invoke",
152        );
153
154        let result = try_as_one_value_vec(result)?;
155        let outcome = InterpreterOutcome::from_ivalue(result)
156            .map_err(RunnerError::InterpreterResultDeError)?;
157        let outcome = RawAVMOutcome::from_interpreter_outcome(outcome)?;
158
159        Ok(outcome)
160    }
161
162    #[allow(clippy::too_many_arguments)]
163    #[tracing::instrument(skip_all)]
164    pub async fn call_tracing(
165        &mut self,
166        air: impl Into<String>,
167        prev_data: impl Into<Vec<u8>>,
168        data: impl Into<Vec<u8>>,
169        init_peer_id: impl Into<String>,
170        timestamp: u64,
171        ttl: u32,
172        current_peer_id: impl Into<String>,
173        call_results: CallResults,
174        tracing_params: String,
175        tracing_output_mode: u8,
176        key_format: u8,
177        secret_key_bytes: Vec<u8>,
178        particle_id: String,
179    ) -> RunnerResult<RawAVMOutcome> {
180        let mut args = prepare_args(
181            air,
182            prev_data,
183            data,
184            current_peer_id.into(),
185            init_peer_id.into(),
186            timestamp,
187            ttl,
188            self.aquavm_runtime_limits,
189            call_results,
190            key_format,
191            secret_key_bytes,
192            particle_id,
193        );
194        args.push(IValue::String(tracing_params));
195        args.push(IValue::U8(tracing_output_mode));
196
197        let result = measure!(
198            self.marine
199                .call_with_ivalues_async(
200                    &self.wasm_filename,
201                    "invoke_tracing",
202                    &args,
203                    <_>::default(),
204                )
205                .await?,
206            tracing::Level::INFO,
207            "marine.call_with_ivalues",
208            method = "invoke_tracing",
209        );
210
211        let result = try_as_one_value_vec(result)?;
212        let outcome = InterpreterOutcome::from_ivalue(result)
213            .map_err(RunnerError::InterpreterResultDeError)?;
214        let outcome = RawAVMOutcome::from_interpreter_outcome(outcome)?;
215
216        Ok(outcome)
217    }
218
219    pub async fn to_human_readable_data<'this>(
220        &'this mut self,
221        data: Vec<u8>,
222    ) -> RunnerResult<String> {
223        let args = vec![IValue::ByteArray(data)];
224
225        let result = self
226            .marine
227            .call_with_ivalues_async(
228                &self.wasm_filename,
229                "to_human_readable_data",
230                &args,
231                <_>::default(),
232            )
233            .await?;
234        let result = try_as_one_value_vec(result)?;
235        let outcome = try_as_string(result, "result").map_err(RunnerError::Aux)?;
236        Ok(outcome)
237    }
238
239    pub fn memory_stats(&self) -> AVMMemoryStats {
240        let stats = self.marine.module_memory_stats();
241
242        // only the interpreters must be loaded in Marine
243        debug_assert!(stats.modules.len() == 1);
244
245        AVMMemoryStats {
246            memory_size: stats.modules[0].memory_size,
247            total_memory_limit: self.total_memory_limit,
248            allocation_rejects: stats.allocation_stats.map(|stats| stats.allocation_rejects),
249        }
250    }
251}
252
253#[allow(clippy::too_many_arguments)]
254#[tracing::instrument(skip(air, prev_data, data, call_results, secret_key_bytes))]
255fn prepare_args(
256    air: impl Into<String>,
257    prev_data: impl Into<Vec<u8>>,
258    data: impl Into<Vec<u8>>,
259    current_peer_id: String,
260    init_peer_id: String,
261    timestamp: u64,
262    ttl: u32,
263    aquavm_runtime_limits: AquaVMRuntimeLimits,
264    call_results: CallResults,
265    key_format: u8,
266    secret_key_bytes: Vec<u8>,
267    particle_id: String,
268) -> Vec<IValue> {
269    let AquaVMRuntimeLimits {
270        air_size_limit,
271        particle_size_limit,
272        call_result_size_limit,
273        hard_limit_enabled,
274    } = aquavm_runtime_limits;
275
276    let run_parameters = air_interpreter_interface::RunParameters::new(
277        init_peer_id,
278        current_peer_id,
279        timestamp,
280        ttl,
281        key_format,
282        secret_key_bytes,
283        particle_id,
284        air_size_limit,
285        particle_size_limit,
286        call_result_size_limit,
287        hard_limit_enabled,
288    )
289    .into_ivalue();
290
291    let call_results = avm_interface::into_raw_result(call_results);
292    let call_results = measure!(
293        CallResultsRepr
294            .serialize(&call_results)
295            .expect("the default serializer shouldn't fail"),
296        tracing::Level::INFO,
297        "CallResultsRepr.serialize"
298    );
299
300    vec![
301        IValue::String(air.into()),
302        IValue::ByteArray(prev_data.into()),
303        IValue::ByteArray(data.into()),
304        run_parameters,
305        IValue::ByteArray(call_results.to_vec()),
306    ]
307}
308
309/// Splits given path into its directory and file name
310///
311/// # Example
312/// For path `/path/to/air_interpreter_server.wasm` result will be `Ok(PathBuf(/path/to), "air_interpreter_server.wasm")`
313fn split_dirname(path: PathBuf) -> RunnerResult<(PathBuf, String)> {
314    use RunnerError::InvalidAIRPath;
315
316    let metadata = path.metadata().map_err(|err| InvalidAIRPath {
317        invalid_path: path.clone(),
318        reason: "failed to get file's metadata (doesn't exist or invalid permissions)",
319        io_error: Some(err),
320    })?;
321
322    if !metadata.is_file() {
323        return Err(InvalidAIRPath {
324            invalid_path: path,
325            reason: "is not a file",
326            io_error: None,
327        });
328    }
329
330    let file_name = path
331        .file_name()
332        .expect("checked to be a file, file name must be defined");
333    let file_name = file_name.to_string_lossy().into_owned();
334
335    let mut path = path;
336    // drop file name from path
337    path.pop();
338
339    Ok((path, file_name))
340}
341
342fn make_marine_config<WB: WasmBackend>(
343    air_wasm_dir: PathBuf,
344    air_wasm_file: &str,
345    total_memory_limit: Option<u64>,
346    logging_mask: i32,
347) -> MarineConfig<WB> {
348    let air_module_config = marine::generic::MarineModuleConfig::<WB> {
349        logger_enabled: true,
350        host_imports: <_>::default(),
351        wasi: None,
352        logging_mask,
353    };
354
355    MarineConfig {
356        modules_dir: Some(air_wasm_dir),
357        total_memory_limit,
358        modules_config: vec![ModuleDescriptor {
359            load_from: None,
360            file_name: String::from(air_wasm_file),
361            import_name: String::from(air_wasm_file),
362            config: air_module_config,
363        }],
364        default_modules_config: None,
365    }
366}
367
368fn try_as_one_value_vec(mut ivalues: Vec<IValue>) -> RunnerResult<IValue> {
369    use RunnerError::IncorrectInterpreterResult;
370
371    if ivalues.len() != 1 {
372        return Err(IncorrectInterpreterResult(ivalues));
373    }
374
375    Ok(ivalues.remove(0))
376}
377
378impl AquaVMRuntimeLimits {
379    pub fn new(
380        air_size_limit: u64,
381        particle_size_limit: u64,
382        call_result_size_limit: u64,
383        hard_limit_enabled: bool,
384    ) -> Self {
385        Self {
386            air_size_limit,
387            particle_size_limit,
388            call_result_size_limit,
389            hard_limit_enabled,
390        }
391    }
392}
393
394impl AVMRuntimeLimits {
395    pub fn new(
396        air_size_limit: Option<u64>,
397        particle_size_limit: Option<u64>,
398        call_result_size_limit: Option<u64>,
399        hard_limit_enabled: bool,
400    ) -> Self {
401        Self {
402            air_size_limit,
403            particle_size_limit,
404            call_result_size_limit,
405            hard_limit_enabled,
406        }
407    }
408}
409
410impl From<AVMRuntimeLimits> for AquaVMRuntimeLimits {
411    fn from(value: AVMRuntimeLimits) -> Self {
412        use air_interpreter_interface::MAX_AIR_SIZE;
413        use air_interpreter_interface::MAX_CALL_RESULT_SIZE;
414        use air_interpreter_interface::MAX_PARTICLE_SIZE;
415
416        AquaVMRuntimeLimits::new(
417            value.air_size_limit.unwrap_or(MAX_AIR_SIZE),
418            value.particle_size_limit.unwrap_or(MAX_PARTICLE_SIZE),
419            value.call_result_size_limit.unwrap_or(MAX_CALL_RESULT_SIZE),
420            value.hard_limit_enabled,
421        )
422    }
423}