use super::avm_runner::AVMRunner;
use super::AVMDataStore;
use super::AVMError;
use super::AVMMemoryStats;
use crate::config::AVMConfig;
use crate::AVMResult;
use avm_data_store::AnomalyData;
use avm_interface::raw_outcome::RawAVMOutcome;
use avm_interface::AVMOutcome;
use avm_interface::CallResults;
use avm_interface::ParticleParameters;
use std::ops::Deref;
use std::ops::DerefMut;
use std::time::Duration;
use std::time::Instant;
struct SendSafeRunner(AVMRunner);
unsafe impl Send for SendSafeRunner {}
impl Deref for SendSafeRunner {
type Target = AVMRunner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for SendSafeRunner {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub struct AVM<E> {
runner: SendSafeRunner,
data_store: AVMDataStore<E>,
}
impl<E> AVM<E> {
#[allow(clippy::result_large_err)]
pub fn new(config: AVMConfig<E>) -> AVMResult<Self, E> {
let AVMConfig {
air_wasm_path,
max_heap_size,
logging_mask,
mut data_store,
} = config;
data_store.initialize()?;
let runner = AVMRunner::new(air_wasm_path, max_heap_size, logging_mask)
.map_err(AVMError::RunnerError)?;
let runner = SendSafeRunner(runner);
let avm = Self { runner, data_store };
Ok(avm)
}
#[allow(clippy::result_large_err)]
pub fn call(
&mut self,
air: impl Into<String>,
data: impl Into<Vec<u8>>,
particle_parameters: ParticleParameters<'_>,
call_results: CallResults,
) -> AVMResult<AVMOutcome, E> {
let air = air.into();
let particle_id = particle_parameters.particle_id.as_ref();
let prev_data = self.data_store.read_data(particle_id)?;
let current_data = data.into();
let execution_start_time = Instant::now();
let memory_size_before = self.memory_stats().memory_size;
let outcome = self
.runner
.call(
air.clone(),
prev_data,
current_data.clone(),
particle_parameters.init_peer_id.clone().into_owned(),
particle_parameters.timestamp,
particle_parameters.ttl,
particle_parameters.current_peer_id.clone(),
call_results,
)
.map_err(AVMError::RunnerError)?;
let execution_time = execution_start_time.elapsed();
let memory_delta = self.memory_stats().memory_size - memory_size_before;
if self.data_store.detect_anomaly(execution_time, memory_delta) {
self.save_anomaly_data(
&air,
¤t_data,
&particle_parameters,
&outcome,
execution_time,
memory_delta,
)?;
}
self.data_store.store_data(&outcome.data, particle_id)?;
let outcome = AVMOutcome::from_raw_outcome(outcome, memory_delta, execution_time)
.map_err(AVMError::InterpreterFailed)?;
Ok(outcome)
}
#[allow(clippy::result_large_err)]
pub fn cleanup_data(&mut self, particle_id: &str) -> AVMResult<(), E> {
self.data_store.cleanup_data(particle_id)?;
Ok(())
}
pub fn memory_stats(&self) -> AVMMemoryStats {
self.runner.memory_stats()
}
#[allow(clippy::result_large_err)]
fn save_anomaly_data(
&mut self,
air_script: &str,
current_data: &[u8],
particle_parameters: &ParticleParameters<'_>,
avm_outcome: &RawAVMOutcome,
execution_time: Duration,
memory_delta: usize,
) -> AVMResult<(), E> {
let prev_data = self
.data_store
.read_data(&particle_parameters.particle_id)?;
let ser_particle =
serde_json::to_vec(particle_parameters).map_err(AVMError::AnomalyDataSeError)?;
let ser_avm_outcome =
serde_json::to_vec(avm_outcome).map_err(AVMError::AnomalyDataSeError)?;
let anomaly_data = AnomalyData::new(
air_script,
&ser_particle,
&prev_data,
current_data,
&ser_avm_outcome,
execution_time,
memory_delta,
);
self.data_store
.collect_anomaly_data(&particle_parameters.particle_id, anomaly_data)
.map_err(Into::into)
}
}