use serde_json::Value;
use tower::ServiceExt;
use wasmtime::component::{HasSelf, Linker};
use crate::bindings::camel::plugin::host::Host;
use crate::bindings::camel::plugin::types::WasmError;
use crate::runtime::WasmHostState;
impl Host for WasmHostState {
fn camel_call(&mut self, uri: String, payload: String) -> Result<String, WasmError> {
if self.call_depth > 0 {
return Err(WasmError::ProcessorError(
"recursive wasm calls not supported".to_string(),
));
}
self.call_depth += 1;
let registry = self.registry.clone();
let async_work = async {
let scheme = uri.split(':').next().unwrap_or("").to_string();
if scheme.is_empty() {
return Err(WasmError::ProcessorError(format!(
"invalid URI (no scheme): {}",
uri
)));
}
let component = {
let guard = registry
.lock()
.map_err(|e| WasmError::Io(format!("registry lock poisoned: {}", e)))?;
guard.get(&scheme).ok_or_else(|| {
WasmError::ProcessorError(format!("component not found for scheme: {}", scheme))
})?
};
let endpoint = component
.create_endpoint(&uri, &camel_component_api::NoOpComponentContext)
.map_err(|e| WasmError::ProcessorError(format!("create_endpoint failed: {}", e)))?;
let rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability> =
std::sync::Arc::new(camel_component_api::NoOpComponentContext);
let producer = endpoint
.create_producer(rt, &camel_api::ProducerContext::new())
.map_err(|e| WasmError::ProcessorError(format!("create_producer failed: {}", e)))?;
let exchange =
camel_api::Exchange::new(camel_api::Message::new(camel_api::Body::Text(payload)));
let result = producer
.oneshot(exchange)
.await
.map_err(|e| WasmError::ProcessorError(format!("endpoint call failed: {}", e)))?;
let body_str = match &result.output {
Some(msg) => match &msg.body {
camel_api::Body::Text(s) => s.clone(),
camel_api::Body::Json(v) => v.to_string(),
camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
camel_api::Body::Xml(s) => s.clone(),
camel_api::Body::Empty => String::new(),
camel_api::Body::Stream(_) => "<stream>".to_string(),
},
None => match &result.input.body {
camel_api::Body::Text(s) => s.clone(),
camel_api::Body::Json(v) => v.to_string(),
camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
camel_api::Body::Xml(s) => s.clone(),
camel_api::Body::Empty => String::new(),
camel_api::Body::Stream(_) => "<stream>".to_string(),
},
};
Ok(body_str)
};
let handle = self.tokio_handle.clone();
let result = tokio::task::block_in_place(|| handle.block_on(async_work));
self.call_depth -= 1;
result
}
fn camel_poll(&mut self, uri: String, timeout_ms: u32) -> Result<String, WasmError> {
if self.call_depth > 0 {
return Err(WasmError::ProcessorError(
"recursive wasm calls not supported".to_string(),
));
}
self.call_depth += 1;
let registry = self.registry.clone();
let async_work = async {
let scheme = uri.split(':').next().unwrap_or("").to_string();
if scheme.is_empty() {
return Err(WasmError::ProcessorError(format!(
"invalid URI (no scheme): {}",
uri
)));
}
let component = {
let guard = registry
.lock()
.map_err(|e| WasmError::Io(format!("registry lock poisoned: {}", e)))?;
guard.get(&scheme).ok_or_else(|| {
WasmError::ProcessorError(format!("component not found for scheme: {}", scheme))
})?
};
let endpoint = component
.create_endpoint(&uri, &camel_component_api::NoOpComponentContext)
.map_err(|e| WasmError::ProcessorError(format!("create_endpoint failed: {}", e)))?;
let mut poller = endpoint.polling_consumer().ok_or_else(|| {
WasmError::ProcessorError(format!(
"camel_poll requires a component that supports polling consumers (scheme: {})",
scheme
))
})?;
let exchange = poller
.receive(std::time::Duration::from_millis(timeout_ms as u64))
.await
.map_err(|e| WasmError::ProcessorError(format!("poll failed: {}", e)))?;
let body_str = match exchange {
Some(ex) => {
let bytes = ex
.input
.body
.into_bytes(10 * 1024 * 1024)
.await
.map_err(|e| {
WasmError::ProcessorError(format!("body read failed: {}", e))
})?;
String::from_utf8_lossy(&bytes).to_string()
}
None => {
return Err(WasmError::ProcessorError(format!(
"no message received within {}ms timeout",
timeout_ms
)));
}
};
Ok(body_str)
};
let handle = self.tokio_handle.clone();
let result = tokio::task::block_in_place(|| handle.block_on(async_work));
self.call_depth -= 1;
result
}
fn get_property(&mut self, key: String) -> Option<String> {
self.properties.get(&key).map(|v| match v {
Value::String(s) => s.clone(),
other => other.to_string(),
})
}
fn set_property(&mut self, key: String, value: String) {
let parsed = serde_json::from_str::<Value>(&value).unwrap_or(Value::String(value));
self.properties.insert(key, parsed);
}
fn host_store(&mut self, key: String, value: String) -> Result<(), WasmError> {
self.state_store.store(&key, &value).map_err(WasmError::Io)
}
fn host_load(&mut self, key: String) -> Result<Option<String>, WasmError> {
self.state_store.load(&key).map_err(WasmError::Io)
}
}
pub fn add_to_linker(linker: &mut Linker<WasmHostState>) -> Result<(), wasmtime::Error> {
crate::bindings::camel::plugin::host::add_to_linker::<_, HasSelf<_>>(linker, |state| state)
}
macro_rules! impl_host_for_binding {
($bindings_mod:ident) => {
impl crate::$bindings_mod::camel::plugin::host::Host for WasmHostState {
fn camel_call(
&mut self,
uri: String,
payload: String,
) -> Result<String, crate::$bindings_mod::camel::plugin::types::WasmError> {
let host = self as &mut dyn Host;
host.camel_call(uri, payload).map_err(|e| match e {
WasmError::ProcessorError(s) => {
crate::$bindings_mod::camel::plugin::types::WasmError::ProcessorError(s)
}
WasmError::TypeConversion(s) => {
crate::$bindings_mod::camel::plugin::types::WasmError::TypeConversion(s)
}
WasmError::Io(s) => {
crate::$bindings_mod::camel::plugin::types::WasmError::Io(s)
}
WasmError::Timeout => {
crate::$bindings_mod::camel::plugin::types::WasmError::Timeout
}
})
}
fn camel_poll(
&mut self,
uri: String,
timeout_ms: u32,
) -> Result<String, crate::$bindings_mod::camel::plugin::types::WasmError> {
let host = self as &mut dyn Host;
host.camel_poll(uri, timeout_ms).map_err(|e| match e {
WasmError::ProcessorError(s) => {
crate::$bindings_mod::camel::plugin::types::WasmError::ProcessorError(s)
}
WasmError::TypeConversion(s) => {
crate::$bindings_mod::camel::plugin::types::WasmError::TypeConversion(s)
}
WasmError::Io(s) => {
crate::$bindings_mod::camel::plugin::types::WasmError::Io(s)
}
WasmError::Timeout => {
crate::$bindings_mod::camel::plugin::types::WasmError::Timeout
}
})
}
fn get_property(&mut self, key: String) -> Option<String> {
let host = self as &mut dyn Host;
host.get_property(key)
}
fn set_property(&mut self, key: String, value: String) {
let host = self as &mut dyn Host;
host.set_property(key, value)
}
fn host_store(
&mut self,
key: String,
value: String,
) -> Result<(), crate::$bindings_mod::camel::plugin::types::WasmError> {
let host = self as &mut dyn Host;
host.host_store(key, value).map_err(|e| match e {
WasmError::Io(s) => {
crate::$bindings_mod::camel::plugin::types::WasmError::Io(s)
}
other => {
crate::$bindings_mod::camel::plugin::types::WasmError::Io(other.to_string())
}
})
}
fn host_load(
&mut self,
key: String,
) -> Result<Option<String>, crate::$bindings_mod::camel::plugin::types::WasmError> {
let host = self as &mut dyn Host;
host.host_load(key).map_err(|e| match e {
WasmError::Io(s) => {
crate::$bindings_mod::camel::plugin::types::WasmError::Io(s)
}
other => {
crate::$bindings_mod::camel::plugin::types::WasmError::Io(other.to_string())
}
})
}
}
};
}
pub fn add_bean_to_linker(linker: &mut Linker<WasmHostState>) -> Result<(), wasmtime::Error> {
crate::bean_bindings::camel::plugin::host::add_to_linker::<_, HasSelf<_>>(linker, |state| state)
}
impl_host_for_binding!(bean_bindings);
pub fn add_security_policy_to_linker(
linker: &mut Linker<WasmHostState>,
) -> Result<(), wasmtime::Error> {
crate::security_policy_bindings::camel::plugin::host::add_to_linker::<_, HasSelf<_>>(
linker,
|state| state,
)
}
impl_host_for_binding!(security_policy_bindings);
#[cfg(test)]
mod tests {
use super::*;
use camel_core::Registry;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::OnceLock;
fn test_tokio_handle() -> tokio::runtime::Handle {
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RT.get_or_init(|| tokio::runtime::Runtime::new().expect("test runtime"))
.handle()
.clone()
}
fn make_state(call_depth: u32) -> WasmHostState {
WasmHostState {
table: wasmtime::component::ResourceTable::new(),
wasi: wasmtime_wasi::WasiCtxBuilder::new()
.inherit_stderr()
.build(),
properties: HashMap::new(),
registry: Arc::new(std::sync::Mutex::new(Registry::new())),
call_depth,
limits: wasmtime::StoreLimits::default(),
state_store: crate::state_store::StateStore::new(),
tokio_handle: test_tokio_handle(),
}
}
#[test]
fn test_recursion_guard_blocks_nested_calls() {
let state = make_state(1);
assert!(state.call_depth > 0);
}
#[test]
fn test_recursion_guard_allows_initial_call() {
let state = make_state(0);
assert_eq!(state.call_depth, 0);
}
#[test]
fn test_get_property_string_value() {
let mut state = make_state(0);
state
.properties
.insert("key".to_string(), Value::String("value".to_string()));
let value = state.properties.get("key").map(|v| match v {
Value::String(s) => s.clone(),
other => other.to_string(),
});
assert_eq!(value, Some("value".to_string()));
}
#[test]
fn test_get_property_missing_key() {
let state = make_state(0);
assert!(!state.properties.contains_key("missing"));
}
#[test]
fn test_set_property_json_value() {
let mut state = make_state(0);
let parsed = serde_json::from_str::<Value>("{\"nested\":true}")
.unwrap_or(Value::String("{\"nested\":true}".to_string()));
state.properties.insert("json_key".to_string(), parsed);
assert!(state.properties.get("json_key").unwrap().is_object());
}
#[test]
fn test_uri_scheme_parsing() {
assert_eq!("direct".split(':').next().unwrap_or(""), "direct");
assert_eq!("log:info".split(':').next().unwrap_or(""), "log");
assert_eq!("noscheme".split(':').next().unwrap_or(""), "noscheme");
assert_eq!("".split(':').next().unwrap_or(""), "");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_camel_call_does_not_panic_inside_tokio_runtime() {
let mut state = make_state(0);
let result = Host::camel_call(&mut state, "noscheme".to_string(), "{}".to_string());
assert!(
result.is_err(),
"should return error for empty scheme, not panic"
);
}
}