1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use serde_json::Value;
6use wasmtime::component::{Component, Linker, ResourceTable};
7use wasmtime::{Config, Engine, Store};
8use wasmtime_wasi::WasiCtxBuilder;
9
10use camel_core::Registry;
11
12use crate::bindings::Plugin;
13use crate::bindings::camel::plugin::types::WasmExchange;
14use crate::error::WasmError;
15
16pub struct WasmHostState {
17 pub table: ResourceTable,
18 pub wasi: wasmtime_wasi::WasiCtx,
19 pub properties: HashMap<String, Value>,
20 pub registry: Arc<std::sync::Mutex<Registry>>,
21 pub call_depth: u32,
22 pub limits: wasmtime::StoreLimits,
23 pub state_store: crate::state_store::StateStore,
24}
25
26impl wasmtime_wasi::WasiView for WasmHostState {
27 fn ctx(&mut self) -> wasmtime_wasi::WasiCtxView<'_> {
28 wasmtime_wasi::WasiCtxView {
29 ctx: &mut self.wasi,
30 table: &mut self.table,
31 }
32 }
33}
34
35pub struct WasmRuntime {
36 engine: Engine,
37 linker: Linker<WasmHostState>,
38 component: Component,
39 module_path: PathBuf,
40 config: crate::config::WasmConfig,
41 #[allow(dead_code)]
42 epoch_ticker: crate::epoch::EpochTicker,
43}
44
45impl WasmRuntime {
46 pub async fn new(
47 module_path: impl AsRef<Path>,
48 wasm_config: crate::config::WasmConfig,
49 ) -> Result<Self, WasmError> {
50 let module_path = module_path.as_ref().to_path_buf();
51
52 let mut config = Config::new();
53 config.wasm_component_model(true);
54 config.epoch_interruption(true);
55
56 let engine =
57 Engine::new(&config).map_err(|e| WasmError::CompilationFailed(e.to_string()))?;
58
59 let component = Component::from_file(&engine, &module_path).map_err(|e| {
60 WasmError::ModuleNotFound(format!(
61 "Failed to load WASM module {}: {}",
62 module_path.display(),
63 e
64 ))
65 })?;
66
67 let mut linker: Linker<WasmHostState> = Linker::new(&engine);
68
69 wasmtime_wasi::p2::add_to_linker_async(&mut linker)
70 .map_err(|e| WasmError::CompilationFailed(e.to_string()))?;
71
72 crate::host_functions::add_to_linker(&mut linker)
73 .map_err(|e| WasmError::CompilationFailed(e.to_string()))?;
74
75 let epoch_ticker =
76 crate::epoch::EpochTicker::start(engine.clone(), wasm_config.epoch_interval());
77
78 Ok(Self {
79 engine,
80 linker,
81 component,
82 module_path,
83 config: wasm_config,
84 epoch_ticker,
85 })
86 }
87
88 pub fn create_host_state(
89 registry: Arc<std::sync::Mutex<Registry>>,
90 properties: HashMap<String, Value>,
91 state_store: crate::state_store::StateStore,
92 ) -> WasmHostState {
93 WasmHostState {
94 table: ResourceTable::new(),
95 wasi: WasiCtxBuilder::new().inherit_stderr().build(),
96 properties,
97 registry,
98 call_depth: 0,
99 limits: wasmtime::StoreLimits::default(),
100 state_store,
101 }
102 }
103
104 fn classify_error(&self, e: wasmtime::Error) -> WasmError {
109 let plugin_name = self.module_path.display().to_string();
110 if let Some(trap) = e.downcast_ref::<wasmtime::Trap>() {
111 let reason = WasmError::classify_trap(trap);
112 match reason {
113 crate::error::TrapReason::Timeout => WasmError::Timeout {
114 plugin: plugin_name,
115 timeout_secs: self.config.timeout_secs,
116 },
117 crate::error::TrapReason::OutOfMemory => WasmError::OutOfMemory {
118 plugin: plugin_name,
119 max_memory_bytes: self.config.max_memory_bytes,
120 },
121 other => WasmError::Trap {
122 plugin: plugin_name,
123 reason: other,
124 },
125 }
126 } else {
127 WasmError::GuestPanic(e.to_string())
128 }
129 }
130
131 pub async fn call_init_once(
132 &self,
133 registry: Arc<std::sync::Mutex<Registry>>,
134 properties: HashMap<String, Value>,
135 state_store: crate::state_store::StateStore,
136 ) -> Result<(), WasmError> {
137 let host_state = Self::create_host_state(registry, properties, state_store);
138 let mut store = Store::new(&self.engine, host_state);
139 store.limiter(|state| &mut state.limits);
140 store.set_epoch_deadline(self.config.epoch_deadline());
141
142 let plugin = Plugin::instantiate_async(&mut store, &self.component, &self.linker)
143 .await
144 .map_err(|e| WasmError::InstantiationFailed(e.to_string()))?;
145
146 let result: Result<(), String> = plugin
147 .call_init(&mut store)
148 .await
149 .map_err(|e| self.classify_error(e))?;
150
151 if let Err(e) = result {
152 tracing::debug!(
153 "WASM init() returned error (optional hook): {} — {}",
154 self.module_path.display(),
155 e
156 );
157 }
158 Ok(())
159 }
160
161 pub async fn call_process(
162 &self,
163 registry: Arc<std::sync::Mutex<Registry>>,
164 properties: HashMap<String, Value>,
165 state_store: crate::state_store::StateStore,
166 exchange: WasmExchange,
167 ) -> Result<WasmExchange, WasmError> {
168 let host_state = Self::create_host_state(registry, properties, state_store);
169 let mut store = Store::new(&self.engine, host_state);
170 store.limiter(|state| &mut state.limits);
171 store.set_epoch_deadline(self.config.epoch_deadline());
172
173 let plugin = Plugin::instantiate_async(&mut store, &self.component, &self.linker)
174 .await
175 .map_err(|e| WasmError::InstantiationFailed(e.to_string()))?;
176
177 let result: Result<WasmExchange, crate::bindings::camel::plugin::types::WasmError> = plugin
178 .call_process(&mut store, &exchange)
179 .await
180 .map_err(|e| self.classify_error(e))?;
181
182 result.map_err(|wasm_err| match wasm_err {
183 crate::bindings::camel::plugin::types::WasmError::ProcessorError(s) => {
184 WasmError::GuestPanic(s)
185 }
186 crate::bindings::camel::plugin::types::WasmError::TypeConversion(s) => {
187 WasmError::TypeConversion(s)
188 }
189 crate::bindings::camel::plugin::types::WasmError::Io(s) => WasmError::Io(s),
190 crate::bindings::camel::plugin::types::WasmError::Timeout => {
191 WasmError::GuestPanic("guest timeout".to_string())
192 }
193 })
194 }
195
196 pub fn module_path(&self) -> &Path {
197 &self.module_path
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204
205 #[test]
206 fn test_wasm_host_state_creation() {
207 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
208 let props = HashMap::new();
209 let state = WasmHostState {
210 table: ResourceTable::new(),
211 wasi: WasiCtxBuilder::new().inherit_stderr().build(),
212 properties: props,
213 registry,
214 call_depth: 0,
215 limits: wasmtime::StoreLimits::default(),
216 state_store: crate::state_store::StateStore::new(),
217 };
218 assert!(state.properties.is_empty());
219 assert_eq!(state.call_depth, 0);
220 }
221
222 #[test]
223 fn test_host_state_has_limits_field() {
224 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
225 let state = WasmRuntime::create_host_state(
226 registry,
227 HashMap::new(),
228 crate::state_store::StateStore::new(),
229 );
230 let _limits: &wasmtime::StoreLimits = &state.limits;
231 }
232
233 #[test]
234 fn test_epoch_deadline_set_on_store() {
235 let mut config = wasmtime::Config::new();
236 config.epoch_interruption(true);
237 config.wasm_component_model(true);
238 let engine = Engine::new(&config).unwrap();
239 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
240 let host_state = WasmRuntime::create_host_state(
241 registry,
242 HashMap::new(),
243 crate::state_store::StateStore::new(),
244 );
245 let mut store = Store::new(&engine, host_state);
246 store.set_epoch_deadline(500);
247 }
252
253 #[test]
254 fn test_store_limiter_uses_host_state_limits() {
255 let mut config = wasmtime::Config::new();
256 config.epoch_interruption(true);
257 config.wasm_component_model(true);
258 let engine = Engine::new(&config).unwrap();
259 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
260 let mut host_state = WasmRuntime::create_host_state(
261 registry,
262 HashMap::new(),
263 crate::state_store::StateStore::new(),
264 );
265 host_state.limits = wasmtime::StoreLimitsBuilder::new()
266 .memory_size(1024)
267 .build();
268 let mut store = Store::new(&engine, host_state);
269 store.limiter(|state| &mut state.limits);
270 }
272}