use std::fmt;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use surrealism_types::err::{PrefixErr, SurrealismError, SurrealismResult};
use tokio::sync::Semaphore;
use wasmtime::*;
use web_time::Instant;
use crate::config::{AbiVersion, SurrealismConfig};
use crate::controller::Controller;
use crate::epoch::{self, EngineHandle};
use crate::exports::ExportsManifest;
use crate::host::{InvocationContext, implement_host_functions};
use crate::kv::BTreeMapStore;
use crate::net_allow::{ResolvedNetAllow, resolve_allow_net};
use crate::package::{AttachedFs, SurrealismPackage};
use crate::store::StoreData;
pub struct Runtime {
engine_handle: EngineHandle,
instance_pre: component::InstancePre<StoreData>,
config: Arc<SurrealismConfig>,
wasm_size: usize,
fs_dir: Option<AttachedFs>,
pool: parking_lot::Mutex<Vec<Controller>>,
controller_slots: Arc<Semaphore>,
exports: ExportsManifest,
kv_store: Arc<BTreeMapStore>,
max_pool_size: usize,
max_memory_bytes: Option<usize>,
module_execution_time: Option<Duration>,
resolved_allow_net: Arc<Vec<ResolvedNetAllow>>,
}
impl fmt::Debug for Runtime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let pool_size = self.pool.lock().len();
f.debug_struct("Runtime")
.field("config", &self.config)
.field("wasm_size", &self.wasm_size)
.field("fs_dir", &self.fs_dir)
.field("pool_size", &pool_size)
.field("max_pool_size", &self.max_pool_size)
.field("max_memory_bytes", &self.max_memory_bytes)
.field("module_execution_time", &self.module_execution_time)
.field("exported_functions", &self.exports.functions.len())
.finish_non_exhaustive()
}
}
impl Runtime {
pub fn new(
SurrealismPackage {
wasm,
config,
exports,
fs,
logo: _,
}: SurrealismPackage,
server_pool_size: usize,
server_max_memory: Option<usize>,
server_max_execution_time: Option<Duration>,
server_max_kv_entries: Option<usize>,
server_max_kv_value_bytes: Option<usize>,
) -> SurrealismResult<Self> {
if config.abi != AbiVersion::CURRENT {
return Err(SurrealismError::UnsupportedAbi {
expected: AbiVersion::CURRENT.0,
got: config.abi.0,
});
}
let t0 = Instant::now();
let max_pool_size = config
.capabilities
.max_pool_size
.map(|m| m.min(server_pool_size))
.unwrap_or(server_pool_size);
let max_memory_bytes = match (server_max_memory, config.capabilities.max_memory_bytes) {
(Some(s), Some(m)) => Some(s.min(m)),
(s, m) => s.or(m),
};
let module_execution_time =
match (server_max_execution_time, config.capabilities.max_execution_time) {
(Some(s), Some(m)) => Some(s.min(m)),
(s, m) => s.or(m),
};
let max_kv_entries = match (server_max_kv_entries, config.capabilities.max_kv_entries) {
(Some(s), Some(m)) => Some(s.min(m)),
(s, m) => s.or(m),
};
let max_kv_value_bytes =
match (server_max_kv_value_bytes, config.capabilities.max_kv_value_bytes) {
(Some(s), Some(m)) => Some(s.min(m)),
(s, m) => s.or(m),
};
let kv_store = Arc::new(BTreeMapStore::with_limits(max_kv_entries, max_kv_value_bytes));
let config = Arc::new(config);
let wasm_size = wasm.len();
tracing::debug!(
wasm_size,
fs = fs.is_some(),
max_pool_size,
?max_memory_bytes,
?module_execution_time,
"Runtime::new starting"
);
let guarded = config.capabilities.strict_timeout;
let engine_handle = epoch::shared_engine(guarded);
tracing::debug!(
strict_timeout = guarded,
engine = if guarded {
"guarded"
} else {
"fast"
},
"Runtime::new: selected engine"
);
let instance_pre = Self::build(engine_handle.engine(), &wasm)?;
tracing::debug!(elapsed = ?t0.elapsed(), "Runtime::new build done");
let resolved_allow_net = resolve_allow_net(&config.capabilities.allow_net)
.prefix_err(|| "Failed to resolve allow_net entries")?;
let controller_slots = Arc::new(Semaphore::new(max_pool_size.max(1)));
Ok(Self {
engine_handle,
instance_pre,
config,
wasm_size,
fs_dir: fs,
pool: parking_lot::Mutex::new(Vec::new()),
controller_slots,
exports,
kv_store,
max_pool_size,
max_memory_bytes,
module_execution_time,
resolved_allow_net,
})
}
pub fn wasm_size(&self) -> usize {
self.wasm_size
}
pub fn kv_store(&self) -> &Arc<BTreeMapStore> {
&self.kv_store
}
pub fn config(&self) -> &SurrealismConfig {
&self.config
}
pub fn resolved_allow_net(&self) -> Arc<Vec<ResolvedNetAllow>> {
Arc::clone(&self.resolved_allow_net)
}
pub(crate) fn epoch_deadline_max(&self) -> u64 {
let epoch = self.engine_handle.epoch_counter().load(Ordering::Acquire);
u64::MAX.saturating_sub(epoch).saturating_sub(1)
}
fn build(engine: &Engine, wasm: &[u8]) -> SurrealismResult<component::InstancePre<StoreData>> {
let t0 = Instant::now();
let comp = component::Component::new(engine, wasm)
.prefix_err(|| "Failed to construct component from bytes")?;
tracing::debug!(elapsed = ?t0.elapsed(), "build: Component::new");
let t1 = Instant::now();
let mut linker: component::Linker<StoreData> = component::Linker::new(engine);
wasmtime_wasi::p2::add_to_linker_async(&mut linker)
.prefix_err(|| "failed to add WASI P2 to component linker")?;
implement_host_functions(&mut linker)
.prefix_err(|| "failed to implement host functions")?;
tracing::debug!(elapsed = ?t1.elapsed(), "build: linker setup");
let t2 = Instant::now();
let instance_pre = linker
.instantiate_pre(&comp)
.prefix_err(|| "failed to pre-instantiate component (import resolution)")?;
tracing::debug!(elapsed = ?t2.elapsed(), "build: instantiate_pre");
tracing::debug!(elapsed = ?t0.elapsed(), "build: total");
Ok(instance_pre)
}
#[tracing::instrument(skip_all)]
pub async fn acquire_controller(
&self,
context: Box<dyn InvocationContext>,
) -> SurrealismResult<Controller> {
let permit = self.acquire_slot().await?;
let pooled = {
let mut pool = self.pool.lock();
let size = pool.len();
let ctrl = pool.pop();
tracing::debug!(
pool_size_before = size,
got_pooled = ctrl.is_some(),
"acquire_controller: pool.pop()"
);
ctrl
};
match pooled {
Some(mut ctrl) => {
tracing::debug!("acquire_controller: reusing pooled controller");
ctrl.attach_controller_slot(permit);
ctrl.reset_epoch_deadline();
ctrl.set_context(context);
Ok(ctrl)
}
None => {
tracing::info!("acquire_controller: creating NEW controller + init()");
let mut ctrl = self.create_controller(context, permit).await?;
ctrl.init().await?;
Ok(ctrl)
}
}
}
pub fn release_controller(&self, mut controller: Controller) {
controller.clear_context();
drop(controller.take_controller_slot());
let mut pool = self.pool.lock();
if pool.len() < self.max_pool_size {
tracing::debug!(
pool_size_after = pool.len() + 1,
max_pool_size = self.max_pool_size,
"release_controller: returned to pool"
);
pool.push(controller);
} else {
tracing::info!(
pool_size = pool.len(),
max_pool_size = self.max_pool_size,
"release_controller: pool full, dropping controller"
);
}
}
pub fn get_signature(
&self,
sub: Option<&str>,
) -> SurrealismResult<&crate::exports::FunctionExport> {
self.exports.get_signature(sub).ok_or_else(|| {
let name = sub.unwrap_or("<default>");
SurrealismError::Other(anyhow::anyhow!(
"function '{name}' not found in exports manifest"
))
})
}
pub fn exports(&self) -> &ExportsManifest {
&self.exports
}
#[tracing::instrument(skip_all)]
pub async fn new_controller(
&self,
context: Box<dyn InvocationContext>,
) -> SurrealismResult<Controller> {
let permit = self.acquire_slot().await?;
self.create_controller(context, permit).await
}
async fn acquire_slot(&self) -> SurrealismResult<tokio::sync::OwnedSemaphorePermit> {
Arc::clone(&self.controller_slots).acquire_owned().await.map_err(|_| {
SurrealismError::Other(anyhow::anyhow!(
"Surrealism controller semaphore closed (runtime shutdown?)"
))
})
}
#[tracing::instrument(skip_all)]
async fn create_controller(
&self,
context: Box<dyn InvocationContext>,
controller_slot: tokio::sync::OwnedSemaphorePermit,
) -> SurrealismResult<Controller> {
let t0 = Instant::now();
let fs_root = self.fs_dir.as_ref().map(|fs| fs.path());
let stdout_cb = crate::wasi_context::new_stdout_callback();
let stderr_cb = crate::wasi_context::new_stderr_callback();
*stdout_cb.lock() = context.stdout_callback();
*stderr_cb.lock() = context.stderr_callback();
let (wasi_ctx, table) = crate::wasi_context::build(
fs_root,
Arc::clone(&self.resolved_allow_net),
Arc::clone(&stdout_cb),
Arc::clone(&stderr_cb),
)?;
tracing::debug!(elapsed = ?t0.elapsed(), "new_controller: wasi_context::build");
let mut limits_builder = StoreLimitsBuilder::new();
if let Some(max_mem) = self.max_memory_bytes {
limits_builder = limits_builder.memory_size(max_mem);
}
let limiter = limits_builder.build();
let store_data = StoreData {
wasi: wasi_ctx,
table,
config: Arc::clone(&self.config),
context,
limiter,
stdout_cb,
stderr_cb,
};
let mut store = Store::new(self.engine_handle.engine(), store_data);
store.limiter(|data| &mut data.limiter);
store.set_epoch_deadline(self.epoch_deadline_max());
let t1 = Instant::now();
let instance = self
.instance_pre
.instantiate_async(&mut store)
.await
.map_err(SurrealismError::Instantiation)?;
tracing::debug!(elapsed = ?t1.elapsed(), "new_controller: instantiate_async");
let t2 = Instant::now();
let invoke_fn = instance.get_func(&mut store, "invoke").ok_or_else(|| {
SurrealismError::Other(anyhow::anyhow!(
"component is missing required export 'invoke'. \
Ensure the module is built with `surreal module build`"
))
})?;
let args_fn = instance.get_func(&mut store, "function-args");
let returns_fn = instance.get_func(&mut store, "function-returns");
let list_fn = instance.get_func(&mut store, "list-functions");
let writeable_fn = instance.get_func(&mut store, "function-writeable");
let comment_fn = instance.get_func(&mut store, "function-comment");
let init_fn = instance.get_func(&mut store, "init");
tracing::debug!(
elapsed = ?t2.elapsed(),
has_invoke = true,
has_args = args_fn.is_some(),
has_returns = returns_fn.is_some(),
has_list = list_fn.is_some(),
has_writeable = writeable_fn.is_some(),
has_comment = comment_fn.is_some(),
has_init = init_fn.is_some(),
"new_controller: export lookup"
);
tracing::info!(elapsed = ?t0.elapsed(), "new_controller: total");
Ok(Controller::new(
store,
invoke_fn,
args_fn,
returns_fn,
list_fn,
writeable_fn,
comment_fn,
init_fn,
self.module_execution_time,
Arc::clone(self.engine_handle.epoch_counter()),
controller_slot,
))
}
}