Skip to main content

camel_component_wasm/
host_functions.rs

1use std::sync::atomic::{AtomicU32, Ordering};
2
3use serde_json::Value;
4use tower::ServiceExt;
5use wasmtime::component::{HasSelf, Linker};
6
7use crate::bindings::camel::plugin::host::Host;
8use crate::bindings::camel::plugin::types::WasmError;
9use crate::runtime::WasmHostState;
10
11static NESTING_DEPTH: AtomicU32 = AtomicU32::new(0);
12
13pub fn current_nesting_depth() -> u32 {
14    NESTING_DEPTH.load(Ordering::Relaxed)
15}
16
17impl Host for WasmHostState {
18    fn camel_call(&mut self, uri: String, payload: String) -> Result<String, WasmError> {
19        let depth = NESTING_DEPTH.fetch_add(1, Ordering::Relaxed);
20        if depth > 0 {
21            NESTING_DEPTH.fetch_sub(1, Ordering::Relaxed);
22            return Err(WasmError::ProcessorError(
23                "recursive wasm calls not supported".to_string(),
24            ));
25        }
26
27        let registry = self.registry.clone();
28        let result = tokio::runtime::Handle::current().block_on(async {
29            let scheme = uri.split(':').next().unwrap_or("").to_string();
30            if scheme.is_empty() {
31                return Err(WasmError::ProcessorError(format!(
32                    "invalid URI (no scheme): {}",
33                    uri
34                )));
35            }
36
37            let component = {
38                let guard = registry
39                    .lock()
40                    .map_err(|e| WasmError::Io(format!("registry lock poisoned: {}", e)))?;
41                guard.get(&scheme).ok_or_else(|| {
42                    WasmError::ProcessorError(format!("component not found for scheme: {}", scheme))
43                })?
44            };
45
46            let endpoint = component
47                .create_endpoint(&uri, &camel_component_api::NoOpComponentContext)
48                .map_err(|e| WasmError::ProcessorError(format!("create_endpoint failed: {}", e)))?;
49
50            let producer = endpoint
51                .create_producer(&camel_api::ProducerContext::new())
52                .map_err(|e| WasmError::ProcessorError(format!("create_producer failed: {}", e)))?;
53
54            let exchange =
55                camel_api::Exchange::new(camel_api::Message::new(camel_api::Body::Text(payload)));
56
57            let result = producer
58                .oneshot(exchange)
59                .await
60                .map_err(|e| WasmError::ProcessorError(format!("endpoint call failed: {}", e)))?;
61
62            let body_str = match &result.output {
63                Some(msg) => match &msg.body {
64                    camel_api::Body::Text(s) => s.clone(),
65                    camel_api::Body::Json(v) => v.to_string(),
66                    camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
67                    camel_api::Body::Xml(s) => s.clone(),
68                    camel_api::Body::Empty => String::new(),
69                    camel_api::Body::Stream(_) => "<stream>".to_string(),
70                },
71                None => match &result.input.body {
72                    camel_api::Body::Text(s) => s.clone(),
73                    camel_api::Body::Json(v) => v.to_string(),
74                    camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
75                    camel_api::Body::Xml(s) => s.clone(),
76                    camel_api::Body::Empty => String::new(),
77                    camel_api::Body::Stream(_) => "<stream>".to_string(),
78                },
79            };
80
81            Ok(body_str)
82        });
83
84        NESTING_DEPTH.fetch_sub(1, Ordering::Relaxed);
85        result
86    }
87
88    fn get_property(&mut self, key: String) -> Option<String> {
89        self.properties.get(&key).map(|v| match v {
90            Value::String(s) => s.clone(),
91            other => other.to_string(),
92        })
93    }
94
95    fn set_property(&mut self, key: String, value: String) {
96        let parsed = serde_json::from_str::<Value>(&value).unwrap_or(Value::String(value));
97        self.properties.insert(key, parsed);
98    }
99
100    fn host_store(&mut self, key: String, value: String) -> Result<(), WasmError> {
101        self.state_store.store(&key, &value).map_err(WasmError::Io)
102    }
103
104    fn host_load(&mut self, key: String) -> Result<Option<String>, WasmError> {
105        self.state_store.load(&key).map_err(WasmError::Io)
106    }
107}
108
109pub fn add_to_linker(linker: &mut Linker<WasmHostState>) -> Result<(), wasmtime::Error> {
110    crate::bindings::camel::plugin::host::add_to_linker::<_, HasSelf<_>>(linker, |state| state)
111}
112
113pub fn add_bean_to_linker(linker: &mut Linker<WasmHostState>) -> Result<(), wasmtime::Error> {
114    crate::bean_bindings::camel::plugin::host::add_to_linker::<_, HasSelf<_>>(linker, |state| state)
115}
116
117use crate::bean_bindings::camel::plugin::host::Host as BeanHost;
118use crate::bean_bindings::camel::plugin::types::WasmError as BeanWasmError;
119
120impl BeanHost for WasmHostState {
121    fn camel_call(&mut self, uri: String, payload: String) -> Result<String, BeanWasmError> {
122        let host = self as &mut dyn Host;
123        host.camel_call(uri, payload).map_err(|e| match e {
124            WasmError::ProcessorError(s) => BeanWasmError::ProcessorError(s),
125            WasmError::TypeConversion(s) => BeanWasmError::TypeConversion(s),
126            WasmError::Io(s) => BeanWasmError::Io(s),
127            WasmError::Timeout => BeanWasmError::Timeout,
128        })
129    }
130
131    fn get_property(&mut self, key: String) -> Option<String> {
132        let host = self as &mut dyn Host;
133        host.get_property(key)
134    }
135
136    fn set_property(&mut self, key: String, value: String) {
137        let host = self as &mut dyn Host;
138        host.set_property(key, value)
139    }
140
141    fn host_store(&mut self, key: String, value: String) -> Result<(), BeanWasmError> {
142        let host = self as &mut dyn Host;
143        host.host_store(key, value).map_err(|e| match e {
144            WasmError::Io(s) => BeanWasmError::Io(s),
145            other => BeanWasmError::Io(other.to_string()),
146        })
147    }
148
149    fn host_load(&mut self, key: String) -> Result<Option<String>, BeanWasmError> {
150        let host = self as &mut dyn Host;
151        host.host_load(key).map_err(|e| match e {
152            WasmError::Io(s) => BeanWasmError::Io(s),
153            other => BeanWasmError::Io(other.to_string()),
154        })
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::runtime::WasmHostState;
162    use camel_core::Registry;
163    use std::collections::HashMap;
164    use std::sync::Arc;
165
166    fn make_state(call_depth: u32) -> WasmHostState {
167        WasmHostState {
168            table: wasmtime::component::ResourceTable::new(),
169            wasi: wasmtime_wasi::WasiCtxBuilder::new()
170                .inherit_stderr()
171                .build(),
172            properties: HashMap::new(),
173            registry: Arc::new(std::sync::Mutex::new(Registry::new())),
174            call_depth,
175            limits: wasmtime::StoreLimits::default(),
176            state_store: crate::state_store::StateStore::new(),
177        }
178    }
179
180    #[test]
181    fn test_recursion_guard_blocks_nested_calls() {
182        let state = make_state(1);
183        assert!(state.call_depth > 0);
184    }
185
186    #[test]
187    fn test_recursion_guard_allows_initial_call() {
188        let state = make_state(0);
189        assert_eq!(state.call_depth, 0);
190    }
191
192    #[test]
193    fn test_get_property_string_value() {
194        let mut state = make_state(0);
195        state
196            .properties
197            .insert("key".to_string(), Value::String("value".to_string()));
198        let value = state.properties.get("key").map(|v| match v {
199            Value::String(s) => s.clone(),
200            other => other.to_string(),
201        });
202        assert_eq!(value, Some("value".to_string()));
203    }
204
205    #[test]
206    fn test_get_property_missing_key() {
207        let state = make_state(0);
208        assert!(!state.properties.contains_key("missing"));
209    }
210
211    #[test]
212    fn test_set_property_json_value() {
213        let mut state = make_state(0);
214        let parsed = serde_json::from_str::<Value>("{\"nested\":true}")
215            .unwrap_or(Value::String("{\"nested\":true}".to_string()));
216        state.properties.insert("json_key".to_string(), parsed);
217        assert!(state.properties.get("json_key").unwrap().is_object());
218    }
219
220    #[test]
221    fn test_uri_scheme_parsing() {
222        assert_eq!("direct".split(':').next().unwrap_or(""), "direct");
223        assert_eq!("log:info".split(':').next().unwrap_or(""), "log");
224        assert_eq!("noscheme".split(':').next().unwrap_or(""), "noscheme");
225        assert_eq!("".split(':').next().unwrap_or(""), "");
226    }
227}