use super::{
RuntimeResult,
contract_store::ContractStore,
delegate_api::DelegateApiVersion,
delegate_store::DelegateStore,
engine::{BackendEngine, Engine, InstanceHandle, WasmEngine},
error::RuntimeInnerError,
native_api,
secrets_store::SecretsStore,
};
use freenet_stdlib::{
memory::{
WasmLinearMem,
buf::{BufferBuilder, BufferMut},
},
prelude::*,
};
use lru::LruCache;
use std::sync::{Arc, Mutex};
use std::{num::NonZeroUsize, sync::atomic::AtomicI64};
pub(crate) type SharedModuleCache<K> = Arc<Mutex<LruCache<K, <Engine as WasmEngine>::Module>>>;
static INSTANCE_ID: AtomicI64 = AtomicI64::new(0);
pub const DEFAULT_MODULE_CACHE_CAPACITY: usize = 1024;
pub(super) struct RunningInstance {
pub id: i64,
pub handle: InstanceHandle,
pub supports_streaming: bool,
dropped_from_engine: bool,
}
impl RunningInstance {
fn new(
engine: &mut Engine,
module: &<Engine as WasmEngine>::Module,
key: Key,
req_bytes: usize,
) -> RuntimeResult<Self> {
let id = INSTANCE_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let handle = engine.create_instance(module, id, req_bytes)?;
let (ptr, size) = engine.memory_info(&handle)?;
native_api::MEM_ADDR.insert(id, InstanceInfo::new(ptr as i64, size, key));
let supports_streaming = engine.module_has_streaming_io(module);
Ok(Self {
id,
handle,
supports_streaming,
dropped_from_engine: false,
})
}
}
impl Drop for RunningInstance {
fn drop(&mut self) {
if !self.dropped_from_engine {
tracing::debug!(
instance_id = self.id,
"RunningInstance dropped without engine cleanup — MEM_ADDR cleaned up, \
but WASM Instance will leak until engine is dropped"
);
}
let _ = native_api::MEM_ADDR.remove(&self.id);
}
}
pub(crate) struct InstanceInfo {
pub start_ptr: i64,
pub mem_size: usize,
key: Key,
}
impl InstanceInfo {
pub(crate) fn new(start_ptr: i64, mem_size: usize, key: Key) -> Self {
Self {
start_ptr,
mem_size,
key,
}
}
pub fn key(&self) -> String {
match &self.key {
Key::Contract(k) => k.encode(),
Key::Delegate(k) => k.encode(),
}
}
}
pub(super) enum Key {
Contract(ContractInstanceId),
Delegate(DelegateKey),
}
#[derive(thiserror::Error, Debug)]
pub enum ContractExecError {
#[error(transparent)]
ContractError(#[from] ContractError),
#[error("Attempted to perform a put for an already put contract ({0}), use update instead")]
DoublePut(ContractKey),
#[error("could not cast array length of {0} to max size (i32::MAX)")]
InvalidArrayLength(usize),
#[error("unexpected result from contract interface")]
UnexpectedResult,
#[error(
"The operation ran out of gas. This might be caused by an infinite loop or an inefficient computation."
)]
OutOfGas,
#[error("The operation exceeded the maximum allowed compute time")]
MaxComputeTimeExceeded,
}
pub struct RuntimeConfig {
pub max_execution_seconds: f64,
pub cpu_cycles_per_second: Option<u64>,
pub safety_margin: f64,
pub enable_metering: bool,
pub module_cache_capacity: usize,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
max_execution_seconds: 5.0,
cpu_cycles_per_second: None,
safety_margin: 0.2,
enable_metering: false,
module_cache_capacity: DEFAULT_MODULE_CACHE_CAPACITY,
}
}
}
pub struct Runtime {
pub(super) engine: Engine,
pub(super) secret_store: SecretsStore,
pub(super) delegate_store: DelegateStore,
pub(super) delegate_modules: SharedModuleCache<DelegateKey>,
pub(crate) contract_store: ContractStore,
pub(super) contract_modules: SharedModuleCache<ContractKey>,
pub(crate) state_store_db: Option<crate::contract::storages::Storage>,
}
impl Runtime {
pub fn is_healthy(&self) -> bool {
self.engine.is_healthy()
}
pub(crate) fn clone_backend_engine(&self) -> BackendEngine {
self.engine.clone_backend_engine()
}
pub fn set_state_store_db(&mut self, db: crate::contract::storages::Storage) {
self.state_store_db = Some(db);
}
pub fn build_with_config(
contract_store: ContractStore,
delegate_store: DelegateStore,
secret_store: SecretsStore,
host_mem: bool,
config: RuntimeConfig,
) -> RuntimeResult<Self> {
let cache_capacity =
NonZeroUsize::new(config.module_cache_capacity).unwrap_or(NonZeroUsize::MIN);
let engine = Engine::new(&config, host_mem)?;
Ok(Self {
engine,
secret_store,
delegate_store,
contract_modules: Arc::new(Mutex::new(LruCache::new(cache_capacity))),
contract_store,
delegate_modules: Arc::new(Mutex::new(LruCache::new(cache_capacity))),
state_store_db: None,
})
}
pub fn build(
contract_store: ContractStore,
delegate_store: DelegateStore,
secret_store: SecretsStore,
host_mem: bool,
) -> RuntimeResult<Self> {
Self::build_with_config(
contract_store,
delegate_store,
secret_store,
host_mem,
RuntimeConfig::default(),
)
}
pub(crate) fn build_with_shared_module_caches(
contract_store: ContractStore,
delegate_store: DelegateStore,
secret_store: SecretsStore,
host_mem: bool,
contract_modules: SharedModuleCache<ContractKey>,
delegate_modules: SharedModuleCache<DelegateKey>,
shared_backend: BackendEngine,
) -> RuntimeResult<Self> {
let engine =
Engine::new_with_shared_backend(&RuntimeConfig::default(), host_mem, shared_backend)?;
Ok(Self {
engine,
secret_store,
delegate_store,
contract_modules,
contract_store,
delegate_modules,
state_store_db: None,
})
}
pub(super) fn drop_running_instance(&mut self, running: &mut RunningInstance) {
self.engine.drop_instance(&running.handle);
running.dropped_from_engine = true;
}
pub(super) fn init_buf<T>(
&mut self,
handle: &InstanceHandle,
data: T,
) -> RuntimeResult<BufferMut<'_>>
where
T: AsRef<[u8]>,
{
let data = data.as_ref();
let builder_ptr = self.engine.initiate_buffer(handle, data.len() as u32)?;
let linear_mem = self.linear_mem(handle)?;
unsafe {
Ok(BufferMut::from_ptr(
builder_ptr as *mut BufferBuilder,
linear_mem,
))
}
}
pub(super) fn init_buf_with_capacity(
&mut self,
handle: &InstanceHandle,
capacity: usize,
) -> RuntimeResult<BufferMut<'_>> {
let builder_ptr = self.engine.initiate_buffer(handle, capacity as u32)?;
let linear_mem = self.linear_mem(handle)?;
unsafe {
Ok(BufferMut::from_ptr(
builder_ptr as *mut BufferBuilder,
linear_mem,
))
}
}
pub(super) fn write_streaming_buf(
&mut self,
handle: &InstanceHandle,
instance_id: i64,
data: &[u8],
max_cap: usize,
) -> RuntimeResult<*mut BufferBuilder> {
use super::native_api::{CONTRACT_IO, PendingContractData};
let header_size = 4usize;
debug_assert!(max_cap >= header_size, "max_cap must be >= {header_size}");
if data.len() > u32::MAX as usize {
return Err(super::ContractExecError::InvalidArrayLength(data.len()).into());
}
let buf_cap = max_cap.min(data.len().saturating_add(header_size));
let mut buf = self.init_buf_with_capacity(handle, buf_cap)?;
let total_len = data.len() as u32;
buf.write(total_len.to_le_bytes())?;
let first_chunk_size = data.len().min(buf_cap - header_size);
buf.write(&data[..first_chunk_size])?;
let ptr = buf.ptr();
if first_chunk_size < data.len() {
CONTRACT_IO.insert(
(instance_id, ptr as i64),
PendingContractData {
data: data[first_chunk_size..].to_vec(),
cursor: 0,
},
);
}
Ok(ptr)
}
pub(super) fn write_contract_buf(
&mut self,
running: &RunningInstance,
data: &[u8],
max_cap: usize,
) -> RuntimeResult<*mut BufferBuilder> {
if running.supports_streaming {
self.write_streaming_buf(&running.handle, running.id, data, max_cap)
} else {
let mut buf = self.init_buf(&running.handle, data)?;
buf.write(data)?;
Ok(buf.ptr())
}
}
pub(super) fn write_contract_buf_serialized<T: serde::Serialize + ?Sized>(
&mut self,
running: &RunningInstance,
value: &T,
max_cap: usize,
) -> RuntimeResult<*mut BufferBuilder> {
if running.supports_streaming {
let serialized = bincode::serialize(value)?;
self.write_streaming_buf(&running.handle, running.id, &serialized, max_cap)
} else {
let size = bincode::serialized_size(value)? as usize;
let mut buf = self.init_buf_with_capacity(&running.handle, size)?;
bincode::serialize_into(&mut buf, value)?;
Ok(buf.ptr())
}
}
pub(super) fn linear_mem(&mut self, handle: &InstanceHandle) -> RuntimeResult<WasmLinearMem> {
let (ptr, size) = self.engine.memory_info(handle)?;
Ok(unsafe { WasmLinearMem::new(ptr, size as u64) })
}
pub(super) fn prepare_contract_call(
&mut self,
key: &ContractKey,
parameters: &Parameters,
req_bytes: usize,
) -> RuntimeResult<RunningInstance> {
let cached = self.contract_modules.lock().unwrap().get(key).cloned();
let module = if let Some(module) = cached {
tracing::debug!(contract = %key, "Module cache hit");
module
} else {
tracing::info!(contract = %key, "Module cache miss — compiling");
let contract = self
.contract_store
.fetch_contract(key, parameters)
.ok_or_else(|| {
tracing::error!(
contract = %key,
key_code_hash = ?key.code_hash(),
phase = "prepare_contract_call_failed",
"Contract not found in store during WASM execution"
);
RuntimeInnerError::ContractNotFound(*key)
})?;
let code = match contract {
ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract_v1)) => {
contract_v1.code().data().to_vec()
}
ContractContainer::Wasm(_) | _ => unimplemented!(),
};
let module = self.engine.compile(&code)?;
let mut cache = self.contract_modules.lock().unwrap();
if let Some(existing) = cache.get(key).cloned() {
existing
} else {
if let Some((evicted_key, _)) = cache.push(*key, module.clone()) {
tracing::warn!(
evicted_contract = %evicted_key,
cache_capacity = cache.cap().get(),
"Module cache eviction. \
Consider increasing DEFAULT_MODULE_CACHE_CAPACITY"
);
}
module
}
};
RunningInstance::new(
&mut self.engine,
&module,
Key::Contract(*key.id()),
req_bytes,
)
}
pub(super) fn prepare_delegate_call(
&mut self,
params: &Parameters,
key: &DelegateKey,
req_bytes: usize,
) -> RuntimeResult<(RunningInstance, DelegateApiVersion)> {
let cached = self.delegate_modules.lock().unwrap().get(key).cloned();
let module = if let Some(module) = cached {
tracing::debug!(delegate = %key, "Module cache hit");
module
} else {
tracing::info!(delegate = %key, "Module cache miss — compiling");
let delegate = self
.delegate_store
.fetch_delegate(key, params)
.ok_or_else(|| RuntimeInnerError::DelegateNotFound(key.clone()))?;
let code = delegate.code().as_ref().to_vec();
let module = self.engine.compile(&code)?;
let mut cache = self.delegate_modules.lock().unwrap();
if let Some(existing) = cache.get(key).cloned() {
existing
} else {
if let Some((evicted_key, _)) = cache.push(key.clone(), module.clone()) {
tracing::warn!(
evicted_delegate = %evicted_key,
cache_capacity = cache.cap().get(),
"Delegate cache eviction. \
Consider increasing DEFAULT_MODULE_CACHE_CAPACITY"
);
}
module
}
};
let api_version = if self.engine.module_has_async_imports(&module) {
DelegateApiVersion::V2
} else {
DelegateApiVersion::V1
};
let running = RunningInstance::new(
&mut self.engine,
&module,
Key::Delegate(key.clone()),
req_bytes,
)?;
Ok((running, api_version))
}
}
impl super::contract::ContractStoreBridge for Runtime {
fn code_hash_from_id(&self, id: &ContractInstanceId) -> Option<CodeHash> {
self.contract_store.code_hash_from_id(id)
}
fn fetch_contract_code(
&self,
key: &ContractKey,
params: &Parameters<'_>,
) -> Option<ContractContainer> {
self.contract_store.fetch_contract(key, params)
}
fn store_contract(&mut self, contract: ContractContainer) -> Result<(), anyhow::Error> {
self.contract_store.store_contract(contract)?;
Ok(())
}
fn remove_contract(&mut self, key: &ContractKey) -> Result<(), anyhow::Error> {
self.contract_store.remove_contract(key)?;
Ok(())
}
fn ensure_key_indexed(&mut self, key: &ContractKey) -> Result<(), anyhow::Error> {
self.contract_store.ensure_key_indexed(key)?;
Ok(())
}
}
impl super::contract::ContractRuntimeBridge for Runtime {}