use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use serde_json::Value;
use wasmtime::component::{Component, Linker, ResourceTable};
use wasmtime::{Config, Engine, Store};
use wasmtime_wasi::WasiCtxBuilder;
use camel_core::Registry;
use crate::bindings::Plugin;
use crate::bindings::camel::plugin::types::WasmExchange;
use crate::error::WasmError;
pub struct WasmHostState {
pub table: ResourceTable,
pub wasi: wasmtime_wasi::WasiCtx,
pub properties: HashMap<String, Value>,
pub registry: Arc<std::sync::Mutex<Registry>>,
pub call_depth: u32,
pub limits: wasmtime::StoreLimits,
pub state_store: crate::state_store::StateStore,
pub tokio_handle: tokio::runtime::Handle,
}
impl wasmtime_wasi::WasiView for WasmHostState {
fn ctx(&mut self) -> wasmtime_wasi::WasiCtxView<'_> {
wasmtime_wasi::WasiCtxView {
ctx: &mut self.wasi,
table: &mut self.table,
}
}
}
pub struct WasmRuntime {
engine: Engine,
linker: Linker<WasmHostState>,
component: Component,
module_path: PathBuf,
config: crate::config::WasmConfig,
#[allow(dead_code)]
epoch_ticker: crate::epoch::EpochTicker,
}
impl WasmRuntime {
pub async fn new(
module_path: impl AsRef<Path>,
wasm_config: crate::config::WasmConfig,
) -> Result<Self, WasmError> {
let module_path = module_path.as_ref().to_path_buf();
let mut config = Config::new();
config.wasm_component_model(true);
#[cfg(feature = "epoch-interruption")]
config.epoch_interruption(true);
let engine =
Engine::new(&config).map_err(|e| WasmError::CompilationFailed(e.to_string()))?;
let component = Component::from_file(&engine, &module_path).map_err(|e| {
WasmError::ModuleNotFound(format!(
"Failed to load WASM module {}: {}",
module_path.display(),
e
))
})?;
let mut linker: Linker<WasmHostState> = Linker::new(&engine);
wasmtime_wasi::p2::add_to_linker_async(&mut linker)
.map_err(|e| WasmError::CompilationFailed(e.to_string()))?;
crate::host_functions::add_to_linker(&mut linker)
.map_err(|e| WasmError::CompilationFailed(e.to_string()))?;
let epoch_ticker =
crate::epoch::EpochTicker::start(engine.clone(), wasm_config.epoch_interval());
Ok(Self {
engine,
linker,
component,
module_path,
config: wasm_config,
epoch_ticker,
})
}
pub fn create_host_state(
registry: Arc<std::sync::Mutex<Registry>>,
properties: HashMap<String, Value>,
state_store: crate::state_store::StateStore,
tokio_handle: tokio::runtime::Handle,
max_memory_bytes: u64,
) -> WasmHostState {
let limits = if max_memory_bytes == 0 {
wasmtime::StoreLimits::default()
} else {
wasmtime::StoreLimitsBuilder::new()
.memory_size(max_memory_bytes as usize)
.build()
};
WasmHostState {
table: ResourceTable::new(),
wasi: WasiCtxBuilder::new().inherit_stderr().build(),
properties,
registry,
call_depth: 0,
limits,
state_store,
tokio_handle,
}
}
fn classify_error(&self, e: wasmtime::Error) -> WasmError {
self.config.classify_error(&self.module_path, e)
}
pub async fn call_init_once(
&self,
registry: Arc<std::sync::Mutex<Registry>>,
properties: HashMap<String, Value>,
state_store: crate::state_store::StateStore,
tokio_handle: tokio::runtime::Handle,
) -> Result<(), WasmError> {
let host_state = Self::create_host_state(
registry,
properties,
state_store,
tokio_handle,
self.config.max_memory_bytes,
);
let mut store = Store::new(&self.engine, host_state);
store.limiter(|state| &mut state.limits);
store.set_epoch_deadline(self.config.epoch_deadline());
let plugin = Plugin::instantiate_async(&mut store, &self.component, &self.linker)
.await
.map_err(|e| WasmError::InstantiationFailed(e.to_string()))?;
let result: Result<(), String> = plugin
.call_init(&mut store)
.await
.map_err(|e| self.classify_error(e))?;
if let Err(e) = result {
tracing::debug!(
"WASM init() returned error (optional hook): {} — {}",
self.module_path.display(),
e
);
}
Ok(())
}
pub async fn call_process(
&self,
registry: Arc<std::sync::Mutex<Registry>>,
properties: HashMap<String, Value>,
state_store: crate::state_store::StateStore,
tokio_handle: tokio::runtime::Handle,
exchange: WasmExchange,
) -> Result<WasmExchange, WasmError> {
let host_state = Self::create_host_state(
registry,
properties,
state_store,
tokio_handle,
self.config.max_memory_bytes,
);
let mut store = Store::new(&self.engine, host_state);
store.limiter(|state| &mut state.limits);
store.set_epoch_deadline(self.config.epoch_deadline());
let plugin = Plugin::instantiate_async(&mut store, &self.component, &self.linker)
.await
.map_err(|e| WasmError::InstantiationFailed(e.to_string()))?;
let result: Result<WasmExchange, crate::bindings::camel::plugin::types::WasmError> = plugin
.call_process(&mut store, &exchange)
.await
.map_err(|e| self.classify_error(e))?;
result.map_err(|wasm_err| match wasm_err {
crate::bindings::camel::plugin::types::WasmError::ProcessorError(s) => {
WasmError::GuestPanic(s)
}
crate::bindings::camel::plugin::types::WasmError::TypeConversion(s) => {
WasmError::TypeConversion(s)
}
crate::bindings::camel::plugin::types::WasmError::Io(s) => WasmError::Io(s),
crate::bindings::camel::plugin::types::WasmError::Timeout => {
WasmError::GuestPanic("guest timeout".to_string())
}
})
}
pub fn module_path(&self) -> &Path {
&self.module_path
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::OnceLock;
fn test_tokio_handle() -> tokio::runtime::Handle {
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RT.get_or_init(|| tokio::runtime::Runtime::new().expect("test runtime"))
.handle()
.clone()
}
#[test]
fn test_wasm_host_state_creation() {
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let props = HashMap::new();
let state = WasmHostState {
table: ResourceTable::new(),
wasi: WasiCtxBuilder::new().inherit_stderr().build(),
properties: props,
registry,
call_depth: 0,
limits: wasmtime::StoreLimits::default(),
state_store: crate::state_store::StateStore::new(),
tokio_handle: test_tokio_handle(),
};
assert!(state.properties.is_empty());
assert_eq!(state.call_depth, 0);
}
#[test]
fn create_host_state_with_zero_memory_falls_back_to_default() {
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let host_state = WasmRuntime::create_host_state(
registry,
HashMap::new(),
crate::state_store::StateStore::new(),
test_tokio_handle(),
0,
);
let _ = host_state; }
#[tokio::test]
async fn memory_growth_rejected_past_configured_cap() {
let wat = r#"
(module
(memory $mem (export "memory") 1)
(func (export "grow") (param i32) (result i32)
local.get 0
memory.grow $mem)
)
"#;
let config = wasmtime::Config::new();
let engine = Engine::new(&config).unwrap();
let module = wasmtime::Module::new(&engine, wat).expect("compile wat");
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let host_state = WasmRuntime::create_host_state(
registry,
HashMap::new(),
crate::state_store::StateStore::new(),
test_tokio_handle(),
64 * 1024, );
let mut store = Store::new(&engine, host_state);
store.limiter(|state| &mut state.limits);
let instance = wasmtime::Instance::new_async(&mut store, &module, &[])
.await
.expect("instantiate");
let grow = instance
.get_typed_func::<i32, i32>(&mut store, "grow")
.expect("get grow export");
let result = grow.call_async(&mut store, 64).await.expect("grow call");
assert_eq!(
result, -1,
"memory.grow must return -1 when the cap (64 KiB) would be exceeded"
);
}
#[tokio::test]
async fn memory_growth_allowed_under_cap() {
let wat = r#"
(module
(memory $mem (export "memory") 1)
(func (export "grow") (param i32) (result i32)
local.get 0
memory.grow $mem)
)
"#;
let config = wasmtime::Config::new();
let engine = Engine::new(&config).unwrap();
let module = wasmtime::Module::new(&engine, wat).expect("compile wat");
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let host_state = WasmRuntime::create_host_state(
registry,
HashMap::new(),
crate::state_store::StateStore::new(),
test_tokio_handle(),
128 * 1024,
);
let mut store = Store::new(&engine, host_state);
store.limiter(|state| &mut state.limits);
let instance = wasmtime::Instance::new_async(&mut store, &module, &[])
.await
.expect("instantiate");
let grow = instance
.get_typed_func::<i32, i32>(&mut store, "grow")
.expect("get grow export");
let result = grow.call_async(&mut store, 1).await.expect("grow call");
assert_eq!(result, 1, "memory.grow of 1 page under cap must succeed");
}
#[test]
fn test_host_state_has_limits_field() {
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let state = WasmRuntime::create_host_state(
registry,
HashMap::new(),
crate::state_store::StateStore::new(),
test_tokio_handle(),
0,
);
let _limits: &wasmtime::StoreLimits = &state.limits;
}
#[test]
fn test_epoch_deadline_set_on_store() {
let mut config = wasmtime::Config::new();
config.epoch_interruption(true);
config.wasm_component_model(true);
let engine = Engine::new(&config).unwrap();
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let host_state = WasmRuntime::create_host_state(
registry,
HashMap::new(),
crate::state_store::StateStore::new(),
test_tokio_handle(),
0,
);
let mut store = Store::new(&engine, host_state);
store.set_epoch_deadline(500);
}
#[test]
fn test_store_limiter_uses_host_state_limits() {
let mut config = wasmtime::Config::new();
config.epoch_interruption(true);
config.wasm_component_model(true);
let engine = Engine::new(&config).unwrap();
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let host_state = WasmRuntime::create_host_state(
registry,
HashMap::new(),
crate::state_store::StateStore::new(),
test_tokio_handle(),
1024, );
let mut store = Store::new(&engine, host_state);
store.limiter(|state| &mut state.limits);
}
#[tokio::test]
async fn timeout_kills_infinite_loop_guest() {
let wat = r#"
(module
(func (export "loop_forever")
loop
br 0
end
)
)
"#;
let mut config = wasmtime::Config::new();
config.epoch_interruption(true);
let engine = Engine::new(&config).unwrap();
let module = wasmtime::Module::new(&engine, wat).expect("compile wat");
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let host_state = WasmRuntime::create_host_state(
registry,
HashMap::new(),
crate::state_store::StateStore::new(),
test_tokio_handle(),
0, );
let mut store = Store::new(&engine, host_state);
store.set_epoch_deadline(1);
use std::sync::atomic::{AtomicBool, Ordering};
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
let engine_clone = engine.clone();
let ticker = std::thread::spawn(move || {
while !shutdown_clone.load(Ordering::SeqCst) {
std::thread::sleep(std::time::Duration::from_millis(10));
engine_clone.increment_epoch();
}
});
let instance = wasmtime::Instance::new_async(&mut store, &module, &[])
.await
.expect("instantiate");
let func = instance
.get_typed_func::<(), ()>(&mut store, "loop_forever")
.expect("get loop_forever export");
let start = std::time::Instant::now();
let result = func.call_async(&mut store, ()).await;
let elapsed = start.elapsed();
shutdown.store(true, Ordering::SeqCst);
ticker.join().expect("ticker thread to exit cleanly");
assert!(
result.is_err(),
"infinite loop must be killed by epoch interruption"
);
assert!(
elapsed < std::time::Duration::from_secs(2),
"timeout must trigger quickly, took {:?}",
elapsed
);
}
}