avm_server/
avm.rs

1/*
2 * AquaVM Workflow Engine
3 *
4 * Copyright (C) 2024 Fluence DAO
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Affero General Public License as
8 * published by the Free Software Foundation version 3 of the
9 * License.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU Affero General Public License for more details.
15 *
16 * You should have received a copy of the GNU Affero General Public License
17 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20use super::avm_runner::AVMRunner;
21use super::AVMDataStore;
22use super::AVMError;
23use super::AVMMemoryStats;
24use crate::config::AVMConfig;
25use crate::AVMResult;
26
27use avm_data_store::AnomalyData;
28use avm_interface::raw_outcome::RawAVMOutcome;
29use avm_interface::AVMOutcome;
30use avm_interface::CallResults;
31use avm_interface::ParticleParameters;
32use fluence_keypair::KeyPair;
33
34use marine_wasm_backend_traits::WasmBackend;
35
36use std::ops::Deref;
37use std::ops::DerefMut;
38use std::time::Duration;
39use std::time::Instant;
40
41/// A newtype needed to mark it as `unsafe impl Send`
42struct SendSafeRunner<WB: WasmBackend>(AVMRunner<WB>);
43
44/// Mark runtime as Send, so libp2p on the node (use-site) is happy
45unsafe impl<WB: WasmBackend> Send for SendSafeRunner<WB> {}
46
47impl<WB: WasmBackend> Deref for SendSafeRunner<WB> {
48    type Target = AVMRunner<WB>;
49
50    fn deref(&self) -> &Self::Target {
51        &self.0
52    }
53}
54impl<WB: WasmBackend> DerefMut for SendSafeRunner<WB> {
55    fn deref_mut(&mut self) -> &mut Self::Target {
56        &mut self.0
57    }
58}
59
60pub struct AVM<E, WB: WasmBackend> {
61    runner: SendSafeRunner<WB>,
62    data_store: AVMDataStore<E>,
63}
64
65impl<E, WB: WasmBackend> AVM<E, WB> {
66    /// Create AVM with provided config.
67    #[allow(clippy::result_large_err)]
68    pub async fn new(config: AVMConfig<E>, wasm_backend: WB) -> AVMResult<Self, E> {
69        let AVMConfig {
70            air_wasm_path,
71            max_heap_size,
72            logging_mask,
73            mut data_store,
74        } = config;
75
76        data_store.initialize()?;
77
78        let runner = AVMRunner::new(
79            air_wasm_path,
80            max_heap_size,
81            <_>::default(),
82            logging_mask,
83            wasm_backend,
84        )
85        .await
86        .map_err(AVMError::RunnerError)?;
87        let runner = SendSafeRunner(runner);
88        let avm = Self { runner, data_store };
89
90        Ok(avm)
91    }
92
93    #[allow(clippy::result_large_err)]
94    pub async fn call(
95        &mut self,
96        air: impl Into<String>,
97        data: impl Into<Vec<u8>>,
98        particle_parameters: ParticleParameters<'_>,
99        call_results: CallResults,
100        keypair: &KeyPair,
101    ) -> AVMResult<AVMOutcome, E> {
102        let air = air.into();
103        let prev_data = self.data_store.read_data(
104            &particle_parameters.particle_id,
105            &particle_parameters.current_peer_id,
106        )?;
107        let current_data = data.into();
108
109        let execution_start_time = Instant::now();
110        let memory_size_before = self.memory_stats().memory_size;
111        let outcome = self
112            .runner
113            .call(
114                air.clone(),
115                prev_data,
116                current_data.clone(),
117                particle_parameters.init_peer_id.clone().into_owned(),
118                particle_parameters.timestamp,
119                particle_parameters.ttl,
120                particle_parameters.current_peer_id.clone(),
121                call_results.clone(),
122                keypair,
123                particle_parameters.particle_id.to_string(),
124            )
125            .await
126            .map_err(AVMError::RunnerError)?;
127
128        let execution_time = execution_start_time.elapsed();
129        let memory_delta = self.memory_stats().memory_size - memory_size_before;
130        if self
131            .data_store
132            .detect_anomaly(execution_time, memory_delta, &outcome)
133        {
134            self.save_anomaly_data(
135                &air,
136                &current_data,
137                &call_results,
138                &particle_parameters,
139                &outcome,
140                execution_time,
141                memory_delta,
142            )?;
143        }
144
145        // persist resulted data
146        self.data_store.store_data(
147            &outcome.data,
148            &particle_parameters.particle_id,
149            &particle_parameters.current_peer_id,
150        )?;
151        let outcome = AVMOutcome::from_raw_outcome(outcome, memory_delta, execution_time)
152            .map_err(AVMError::InterpreterFailed)?;
153
154        Ok(outcome)
155    }
156
157    /// Cleanup data that become obsolete.
158    #[allow(clippy::result_large_err)]
159    pub fn cleanup_data(&mut self, particle_id: &str, current_peer_id: &str) -> AVMResult<(), E> {
160        self.data_store.cleanup_data(particle_id, current_peer_id)?;
161        Ok(())
162    }
163
164    /// Return memory stat of an interpreter heap.
165    pub fn memory_stats(&self) -> AVMMemoryStats {
166        self.runner.memory_stats()
167    }
168
169    #[allow(clippy::result_large_err, clippy::too_many_arguments)]
170    fn save_anomaly_data(
171        &mut self,
172        air_script: &str,
173        current_data: &[u8],
174        call_result: &CallResults,
175        particle_parameters: &ParticleParameters<'_>,
176        avm_outcome: &RawAVMOutcome,
177        execution_time: Duration,
178        memory_delta: usize,
179    ) -> AVMResult<(), E> {
180        let prev_data = self.data_store.read_data(
181            &particle_parameters.particle_id,
182            &particle_parameters.current_peer_id,
183        )?;
184        let call_results = serde_json::to_vec(call_result).map_err(AVMError::AnomalyDataSeError)?;
185        let ser_particle =
186            serde_json::to_vec(particle_parameters).map_err(AVMError::AnomalyDataSeError)?;
187        let ser_avm_outcome =
188            serde_json::to_vec(avm_outcome).map_err(AVMError::AnomalyDataSeError)?;
189
190        let anomaly_data = AnomalyData::new(
191            air_script,
192            &ser_particle,
193            &prev_data,
194            current_data,
195            &call_results,
196            &ser_avm_outcome,
197            execution_time,
198            memory_delta,
199        );
200
201        self.data_store
202            .collect_anomaly_data(
203                &particle_parameters.particle_id,
204                &particle_parameters.current_peer_id,
205                anomaly_data,
206            )
207            .map_err(Into::into)
208    }
209}