Skip to main content

camel_component_wasm/
runtime.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use serde_json::Value;
6use wasmtime::component::{Component, Linker, ResourceTable};
7use wasmtime::{Config, Engine, Store};
8use wasmtime_wasi::WasiCtxBuilder;
9
10use camel_core::Registry;
11
12use crate::bindings::Plugin;
13use crate::bindings::camel::plugin::types::WasmExchange;
14use crate::error::WasmError;
15
16pub struct WasmHostState {
17    pub table: ResourceTable,
18    pub wasi: wasmtime_wasi::WasiCtx,
19    pub properties: HashMap<String, Value>,
20    pub registry: Arc<std::sync::Mutex<Registry>>,
21    pub call_depth: u32,
22    pub limits: wasmtime::StoreLimits,
23    pub state_store: crate::state_store::StateStore,
24}
25
26impl wasmtime_wasi::WasiView for WasmHostState {
27    fn ctx(&mut self) -> wasmtime_wasi::WasiCtxView<'_> {
28        wasmtime_wasi::WasiCtxView {
29            ctx: &mut self.wasi,
30            table: &mut self.table,
31        }
32    }
33}
34
35pub struct WasmRuntime {
36    engine: Engine,
37    linker: Linker<WasmHostState>,
38    component: Component,
39    module_path: PathBuf,
40    config: crate::config::WasmConfig,
41    #[allow(dead_code)]
42    epoch_ticker: crate::epoch::EpochTicker,
43}
44
45impl WasmRuntime {
46    pub async fn new(
47        module_path: impl AsRef<Path>,
48        wasm_config: crate::config::WasmConfig,
49    ) -> Result<Self, WasmError> {
50        let module_path = module_path.as_ref().to_path_buf();
51
52        let mut config = Config::new();
53        config.wasm_component_model(true);
54        config.epoch_interruption(true);
55
56        let engine =
57            Engine::new(&config).map_err(|e| WasmError::CompilationFailed(e.to_string()))?;
58
59        let component = Component::from_file(&engine, &module_path).map_err(|e| {
60            WasmError::ModuleNotFound(format!(
61                "Failed to load WASM module {}: {}",
62                module_path.display(),
63                e
64            ))
65        })?;
66
67        let mut linker: Linker<WasmHostState> = Linker::new(&engine);
68
69        wasmtime_wasi::p2::add_to_linker_async(&mut linker)
70            .map_err(|e| WasmError::CompilationFailed(e.to_string()))?;
71
72        crate::host_functions::add_to_linker(&mut linker)
73            .map_err(|e| WasmError::CompilationFailed(e.to_string()))?;
74
75        let epoch_ticker =
76            crate::epoch::EpochTicker::start(engine.clone(), wasm_config.epoch_interval());
77
78        Ok(Self {
79            engine,
80            linker,
81            component,
82            module_path,
83            config: wasm_config,
84            epoch_ticker,
85        })
86    }
87
88    pub fn create_host_state(
89        registry: Arc<std::sync::Mutex<Registry>>,
90        properties: HashMap<String, Value>,
91        state_store: crate::state_store::StateStore,
92    ) -> WasmHostState {
93        WasmHostState {
94            table: ResourceTable::new(),
95            wasi: WasiCtxBuilder::new().inherit_stderr().build(),
96            properties,
97            registry,
98            call_depth: 0,
99            limits: wasmtime::StoreLimits::default(),
100            state_store,
101        }
102    }
103
104    /// Classify a wasmtime error into a structured WasmError.
105    ///
106    /// Downcasts to `wasmtime::Trap` first — if successful, routes to
107    /// Timeout/OutOfMemory/Trap variants. Otherwise falls back to GuestPanic.
108    fn classify_error(&self, e: wasmtime::Error) -> WasmError {
109        let plugin_name = self.module_path.display().to_string();
110        if let Some(trap) = e.downcast_ref::<wasmtime::Trap>() {
111            let reason = WasmError::classify_trap(trap);
112            match reason {
113                crate::error::TrapReason::Timeout => WasmError::Timeout {
114                    plugin: plugin_name,
115                    timeout_secs: self.config.timeout_secs,
116                },
117                crate::error::TrapReason::OutOfMemory => WasmError::OutOfMemory {
118                    plugin: plugin_name,
119                    max_memory_bytes: self.config.max_memory_bytes,
120                },
121                other => WasmError::Trap {
122                    plugin: plugin_name,
123                    reason: other,
124                },
125            }
126        } else {
127            WasmError::GuestPanic(e.to_string())
128        }
129    }
130
131    pub async fn call_init_once(
132        &self,
133        registry: Arc<std::sync::Mutex<Registry>>,
134        properties: HashMap<String, Value>,
135        state_store: crate::state_store::StateStore,
136    ) -> Result<(), WasmError> {
137        let host_state = Self::create_host_state(registry, properties, state_store);
138        let mut store = Store::new(&self.engine, host_state);
139        store.limiter(|state| &mut state.limits);
140        store.set_epoch_deadline(self.config.epoch_deadline());
141
142        let plugin = Plugin::instantiate_async(&mut store, &self.component, &self.linker)
143            .await
144            .map_err(|e| WasmError::InstantiationFailed(e.to_string()))?;
145
146        let result: Result<(), String> = plugin
147            .call_init(&mut store)
148            .await
149            .map_err(|e| self.classify_error(e))?;
150
151        if let Err(e) = result {
152            tracing::debug!(
153                "WASM init() returned error (optional hook): {} — {}",
154                self.module_path.display(),
155                e
156            );
157        }
158        Ok(())
159    }
160
161    pub async fn call_process(
162        &self,
163        registry: Arc<std::sync::Mutex<Registry>>,
164        properties: HashMap<String, Value>,
165        state_store: crate::state_store::StateStore,
166        exchange: WasmExchange,
167    ) -> Result<WasmExchange, WasmError> {
168        let host_state = Self::create_host_state(registry, properties, state_store);
169        let mut store = Store::new(&self.engine, host_state);
170        store.limiter(|state| &mut state.limits);
171        store.set_epoch_deadline(self.config.epoch_deadline());
172
173        let plugin = Plugin::instantiate_async(&mut store, &self.component, &self.linker)
174            .await
175            .map_err(|e| WasmError::InstantiationFailed(e.to_string()))?;
176
177        let result: Result<WasmExchange, crate::bindings::camel::plugin::types::WasmError> = plugin
178            .call_process(&mut store, &exchange)
179            .await
180            .map_err(|e| self.classify_error(e))?;
181
182        result.map_err(|wasm_err| match wasm_err {
183            crate::bindings::camel::plugin::types::WasmError::ProcessorError(s) => {
184                WasmError::GuestPanic(s)
185            }
186            crate::bindings::camel::plugin::types::WasmError::TypeConversion(s) => {
187                WasmError::TypeConversion(s)
188            }
189            crate::bindings::camel::plugin::types::WasmError::Io(s) => WasmError::Io(s),
190            crate::bindings::camel::plugin::types::WasmError::Timeout => {
191                WasmError::GuestPanic("guest timeout".to_string())
192            }
193        })
194    }
195
196    pub fn module_path(&self) -> &Path {
197        &self.module_path
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204
205    #[test]
206    fn test_wasm_host_state_creation() {
207        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
208        let props = HashMap::new();
209        let state = WasmHostState {
210            table: ResourceTable::new(),
211            wasi: WasiCtxBuilder::new().inherit_stderr().build(),
212            properties: props,
213            registry,
214            call_depth: 0,
215            limits: wasmtime::StoreLimits::default(),
216            state_store: crate::state_store::StateStore::new(),
217        };
218        assert!(state.properties.is_empty());
219        assert_eq!(state.call_depth, 0);
220    }
221
222    #[test]
223    fn test_host_state_has_limits_field() {
224        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
225        let state = WasmRuntime::create_host_state(
226            registry,
227            HashMap::new(),
228            crate::state_store::StateStore::new(),
229        );
230        let _limits: &wasmtime::StoreLimits = &state.limits;
231    }
232
233    #[test]
234    fn test_epoch_deadline_set_on_store() {
235        let mut config = wasmtime::Config::new();
236        config.epoch_interruption(true);
237        config.wasm_component_model(true);
238        let engine = Engine::new(&config).unwrap();
239        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
240        let host_state = WasmRuntime::create_host_state(
241            registry,
242            HashMap::new(),
243            crate::state_store::StateStore::new(),
244        );
245        let mut store = Store::new(&engine, host_state);
246        store.set_epoch_deadline(500);
247        // NOTE: wasmtime v31 does not expose `get_epoch_deadline()` on Store,
248        // so we cannot assert the value was set. This test verifies the API
249        // compiles and does not panic at runtime. The actual deadline enforcement
250        // is validated indirectly by the epoch_interruption integration tests.
251    }
252
253    #[test]
254    fn test_store_limiter_uses_host_state_limits() {
255        let mut config = wasmtime::Config::new();
256        config.epoch_interruption(true);
257        config.wasm_component_model(true);
258        let engine = Engine::new(&config).unwrap();
259        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
260        let mut host_state = WasmRuntime::create_host_state(
261            registry,
262            HashMap::new(),
263            crate::state_store::StateStore::new(),
264        );
265        host_state.limits = wasmtime::StoreLimitsBuilder::new()
266            .memory_size(1024)
267            .build();
268        let mut store = Store::new(&engine, host_state);
269        store.limiter(|state| &mut state.limits);
270        // Verifies store.limiter accepts WasmHostState::limits — compilation test
271    }
272}