1use super::avm_runner::AVMRunner;
21use super::AVMDataStore;
22use super::AVMError;
23use super::AVMMemoryStats;
24use crate::config::AVMConfig;
25use crate::AVMResult;
26
27use avm_data_store::AnomalyData;
28use avm_interface::raw_outcome::RawAVMOutcome;
29use avm_interface::AVMOutcome;
30use avm_interface::CallResults;
31use avm_interface::ParticleParameters;
32use fluence_keypair::KeyPair;
33
34use marine_wasm_backend_traits::WasmBackend;
35
36use std::ops::Deref;
37use std::ops::DerefMut;
38use std::time::Duration;
39use std::time::Instant;
40
41struct SendSafeRunner<WB: WasmBackend>(AVMRunner<WB>);
43
44unsafe impl<WB: WasmBackend> Send for SendSafeRunner<WB> {}
46
47impl<WB: WasmBackend> Deref for SendSafeRunner<WB> {
48 type Target = AVMRunner<WB>;
49
50 fn deref(&self) -> &Self::Target {
51 &self.0
52 }
53}
54impl<WB: WasmBackend> DerefMut for SendSafeRunner<WB> {
55 fn deref_mut(&mut self) -> &mut Self::Target {
56 &mut self.0
57 }
58}
59
60pub struct AVM<E, WB: WasmBackend> {
61 runner: SendSafeRunner<WB>,
62 data_store: AVMDataStore<E>,
63}
64
65impl<E, WB: WasmBackend> AVM<E, WB> {
66 #[allow(clippy::result_large_err)]
68 pub async fn new(config: AVMConfig<E>, wasm_backend: WB) -> AVMResult<Self, E> {
69 let AVMConfig {
70 air_wasm_path,
71 max_heap_size,
72 logging_mask,
73 mut data_store,
74 } = config;
75
76 data_store.initialize()?;
77
78 let runner = AVMRunner::new(
79 air_wasm_path,
80 max_heap_size,
81 <_>::default(),
82 logging_mask,
83 wasm_backend,
84 )
85 .await
86 .map_err(AVMError::RunnerError)?;
87 let runner = SendSafeRunner(runner);
88 let avm = Self { runner, data_store };
89
90 Ok(avm)
91 }
92
93 #[allow(clippy::result_large_err)]
94 pub async fn call(
95 &mut self,
96 air: impl Into<String>,
97 data: impl Into<Vec<u8>>,
98 particle_parameters: ParticleParameters<'_>,
99 call_results: CallResults,
100 keypair: &KeyPair,
101 ) -> AVMResult<AVMOutcome, E> {
102 let air = air.into();
103 let prev_data = self.data_store.read_data(
104 &particle_parameters.particle_id,
105 &particle_parameters.current_peer_id,
106 )?;
107 let current_data = data.into();
108
109 let execution_start_time = Instant::now();
110 let memory_size_before = self.memory_stats().memory_size;
111 let outcome = self
112 .runner
113 .call(
114 air.clone(),
115 prev_data,
116 current_data.clone(),
117 particle_parameters.init_peer_id.clone().into_owned(),
118 particle_parameters.timestamp,
119 particle_parameters.ttl,
120 particle_parameters.current_peer_id.clone(),
121 call_results.clone(),
122 keypair,
123 particle_parameters.particle_id.to_string(),
124 )
125 .await
126 .map_err(AVMError::RunnerError)?;
127
128 let execution_time = execution_start_time.elapsed();
129 let memory_delta = self.memory_stats().memory_size - memory_size_before;
130 if self
131 .data_store
132 .detect_anomaly(execution_time, memory_delta, &outcome)
133 {
134 self.save_anomaly_data(
135 &air,
136 ¤t_data,
137 &call_results,
138 &particle_parameters,
139 &outcome,
140 execution_time,
141 memory_delta,
142 )?;
143 }
144
145 self.data_store.store_data(
147 &outcome.data,
148 &particle_parameters.particle_id,
149 &particle_parameters.current_peer_id,
150 )?;
151 let outcome = AVMOutcome::from_raw_outcome(outcome, memory_delta, execution_time)
152 .map_err(AVMError::InterpreterFailed)?;
153
154 Ok(outcome)
155 }
156
157 #[allow(clippy::result_large_err)]
159 pub fn cleanup_data(&mut self, particle_id: &str, current_peer_id: &str) -> AVMResult<(), E> {
160 self.data_store.cleanup_data(particle_id, current_peer_id)?;
161 Ok(())
162 }
163
164 pub fn memory_stats(&self) -> AVMMemoryStats {
166 self.runner.memory_stats()
167 }
168
169 #[allow(clippy::result_large_err, clippy::too_many_arguments)]
170 fn save_anomaly_data(
171 &mut self,
172 air_script: &str,
173 current_data: &[u8],
174 call_result: &CallResults,
175 particle_parameters: &ParticleParameters<'_>,
176 avm_outcome: &RawAVMOutcome,
177 execution_time: Duration,
178 memory_delta: usize,
179 ) -> AVMResult<(), E> {
180 let prev_data = self.data_store.read_data(
181 &particle_parameters.particle_id,
182 &particle_parameters.current_peer_id,
183 )?;
184 let call_results = serde_json::to_vec(call_result).map_err(AVMError::AnomalyDataSeError)?;
185 let ser_particle =
186 serde_json::to_vec(particle_parameters).map_err(AVMError::AnomalyDataSeError)?;
187 let ser_avm_outcome =
188 serde_json::to_vec(avm_outcome).map_err(AVMError::AnomalyDataSeError)?;
189
190 let anomaly_data = AnomalyData::new(
191 air_script,
192 &ser_particle,
193 &prev_data,
194 current_data,
195 &call_results,
196 &ser_avm_outcome,
197 execution_time,
198 memory_delta,
199 );
200
201 self.data_store
202 .collect_anomaly_data(
203 &particle_parameters.particle_id,
204 &particle_parameters.current_peer_id,
205 anomaly_data,
206 )
207 .map_err(Into::into)
208 }
209}