1use std::sync::atomic::{AtomicU32, Ordering};
2
3use serde_json::Value;
4use tower::ServiceExt;
5use wasmtime::component::{HasSelf, Linker};
6
7use crate::bindings::camel::plugin::host::Host;
8use crate::bindings::camel::plugin::types::WasmError;
9use crate::runtime::WasmHostState;
10
11static NESTING_DEPTH: AtomicU32 = AtomicU32::new(0);
12
13pub fn current_nesting_depth() -> u32 {
14 NESTING_DEPTH.load(Ordering::Relaxed)
15}
16
17impl Host for WasmHostState {
18 fn camel_call(&mut self, uri: String, payload: String) -> Result<String, WasmError> {
19 let depth = NESTING_DEPTH.fetch_add(1, Ordering::Relaxed);
20 if depth > 0 {
21 NESTING_DEPTH.fetch_sub(1, Ordering::Relaxed);
22 return Err(WasmError::ProcessorError(
23 "recursive wasm calls not supported".to_string(),
24 ));
25 }
26
27 let registry = self.registry.clone();
28 let result = tokio::runtime::Handle::current().block_on(async {
29 let scheme = uri.split(':').next().unwrap_or("").to_string();
30 if scheme.is_empty() {
31 return Err(WasmError::ProcessorError(format!(
32 "invalid URI (no scheme): {}",
33 uri
34 )));
35 }
36
37 let component = {
38 let guard = registry
39 .lock()
40 .map_err(|e| WasmError::Io(format!("registry lock poisoned: {}", e)))?;
41 guard.get(&scheme).ok_or_else(|| {
42 WasmError::ProcessorError(format!("component not found for scheme: {}", scheme))
43 })?
44 };
45
46 let endpoint = component
47 .create_endpoint(&uri, &camel_component_api::NoOpComponentContext)
48 .map_err(|e| WasmError::ProcessorError(format!("create_endpoint failed: {}", e)))?;
49
50 let producer = endpoint
51 .create_producer(&camel_api::ProducerContext::new())
52 .map_err(|e| WasmError::ProcessorError(format!("create_producer failed: {}", e)))?;
53
54 let exchange =
55 camel_api::Exchange::new(camel_api::Message::new(camel_api::Body::Text(payload)));
56
57 let result = producer
58 .oneshot(exchange)
59 .await
60 .map_err(|e| WasmError::ProcessorError(format!("endpoint call failed: {}", e)))?;
61
62 let body_str = match &result.output {
63 Some(msg) => match &msg.body {
64 camel_api::Body::Text(s) => s.clone(),
65 camel_api::Body::Json(v) => v.to_string(),
66 camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
67 camel_api::Body::Xml(s) => s.clone(),
68 camel_api::Body::Empty => String::new(),
69 camel_api::Body::Stream(_) => "<stream>".to_string(),
70 },
71 None => match &result.input.body {
72 camel_api::Body::Text(s) => s.clone(),
73 camel_api::Body::Json(v) => v.to_string(),
74 camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
75 camel_api::Body::Xml(s) => s.clone(),
76 camel_api::Body::Empty => String::new(),
77 camel_api::Body::Stream(_) => "<stream>".to_string(),
78 },
79 };
80
81 Ok(body_str)
82 });
83
84 NESTING_DEPTH.fetch_sub(1, Ordering::Relaxed);
85 result
86 }
87
88 fn get_property(&mut self, key: String) -> Option<String> {
89 self.properties.get(&key).map(|v| match v {
90 Value::String(s) => s.clone(),
91 other => other.to_string(),
92 })
93 }
94
95 fn set_property(&mut self, key: String, value: String) {
96 let parsed = serde_json::from_str::<Value>(&value).unwrap_or(Value::String(value));
97 self.properties.insert(key, parsed);
98 }
99
100 fn host_store(&mut self, key: String, value: String) -> Result<(), WasmError> {
101 self.state_store.store(&key, &value).map_err(WasmError::Io)
102 }
103
104 fn host_load(&mut self, key: String) -> Result<Option<String>, WasmError> {
105 self.state_store.load(&key).map_err(WasmError::Io)
106 }
107}
108
109pub fn add_to_linker(linker: &mut Linker<WasmHostState>) -> Result<(), wasmtime::Error> {
110 crate::bindings::camel::plugin::host::add_to_linker::<_, HasSelf<_>>(linker, |state| state)
111}
112
113pub fn add_bean_to_linker(linker: &mut Linker<WasmHostState>) -> Result<(), wasmtime::Error> {
114 crate::bean_bindings::camel::plugin::host::add_to_linker::<_, HasSelf<_>>(linker, |state| state)
115}
116
117use crate::bean_bindings::camel::plugin::host::Host as BeanHost;
118use crate::bean_bindings::camel::plugin::types::WasmError as BeanWasmError;
119
120impl BeanHost for WasmHostState {
121 fn camel_call(&mut self, uri: String, payload: String) -> Result<String, BeanWasmError> {
122 let host = self as &mut dyn Host;
123 host.camel_call(uri, payload).map_err(|e| match e {
124 WasmError::ProcessorError(s) => BeanWasmError::ProcessorError(s),
125 WasmError::TypeConversion(s) => BeanWasmError::TypeConversion(s),
126 WasmError::Io(s) => BeanWasmError::Io(s),
127 WasmError::Timeout => BeanWasmError::Timeout,
128 })
129 }
130
131 fn get_property(&mut self, key: String) -> Option<String> {
132 let host = self as &mut dyn Host;
133 host.get_property(key)
134 }
135
136 fn set_property(&mut self, key: String, value: String) {
137 let host = self as &mut dyn Host;
138 host.set_property(key, value)
139 }
140
141 fn host_store(&mut self, key: String, value: String) -> Result<(), BeanWasmError> {
142 let host = self as &mut dyn Host;
143 host.host_store(key, value).map_err(|e| match e {
144 WasmError::Io(s) => BeanWasmError::Io(s),
145 other => BeanWasmError::Io(other.to_string()),
146 })
147 }
148
149 fn host_load(&mut self, key: String) -> Result<Option<String>, BeanWasmError> {
150 let host = self as &mut dyn Host;
151 host.host_load(key).map_err(|e| match e {
152 WasmError::Io(s) => BeanWasmError::Io(s),
153 other => BeanWasmError::Io(other.to_string()),
154 })
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::runtime::WasmHostState;
162 use camel_core::Registry;
163 use std::collections::HashMap;
164 use std::sync::Arc;
165
166 fn make_state(call_depth: u32) -> WasmHostState {
167 WasmHostState {
168 table: wasmtime::component::ResourceTable::new(),
169 wasi: wasmtime_wasi::WasiCtxBuilder::new()
170 .inherit_stderr()
171 .build(),
172 properties: HashMap::new(),
173 registry: Arc::new(std::sync::Mutex::new(Registry::new())),
174 call_depth,
175 limits: wasmtime::StoreLimits::default(),
176 state_store: crate::state_store::StateStore::new(),
177 }
178 }
179
180 #[test]
181 fn test_recursion_guard_blocks_nested_calls() {
182 let state = make_state(1);
183 assert!(state.call_depth > 0);
184 }
185
186 #[test]
187 fn test_recursion_guard_allows_initial_call() {
188 let state = make_state(0);
189 assert_eq!(state.call_depth, 0);
190 }
191
192 #[test]
193 fn test_get_property_string_value() {
194 let mut state = make_state(0);
195 state
196 .properties
197 .insert("key".to_string(), Value::String("value".to_string()));
198 let value = state.properties.get("key").map(|v| match v {
199 Value::String(s) => s.clone(),
200 other => other.to_string(),
201 });
202 assert_eq!(value, Some("value".to_string()));
203 }
204
205 #[test]
206 fn test_get_property_missing_key() {
207 let state = make_state(0);
208 assert!(!state.properties.contains_key("missing"));
209 }
210
211 #[test]
212 fn test_set_property_json_value() {
213 let mut state = make_state(0);
214 let parsed = serde_json::from_str::<Value>("{\"nested\":true}")
215 .unwrap_or(Value::String("{\"nested\":true}".to_string()));
216 state.properties.insert("json_key".to_string(), parsed);
217 assert!(state.properties.get("json_key").unwrap().is_object());
218 }
219
220 #[test]
221 fn test_uri_scheme_parsing() {
222 assert_eq!("direct".split(':').next().unwrap_or(""), "direct");
223 assert_eq!("log:info".split(':').next().unwrap_or(""), "log");
224 assert_eq!("noscheme".split(':').next().unwrap_or(""), "noscheme");
225 assert_eq!("".split(':').next().unwrap_or(""), "");
226 }
227}