camel-component-wasm 0.9.0

WASM plugin component for rust-camel
Documentation
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use tower::Service;
use tracing::{debug, warn};

use camel_api::{CamelError, Exchange};
use camel_core::Registry;

fn poisoned<T>(e: std::sync::PoisonError<T>) -> CamelError {
    CamelError::ProcessorError(format!("lock poisoned: {}", e))
}

use crate::runtime::WasmRuntime;
use crate::serde_bridge::{exchange_to_wasm, wasm_to_exchange};

#[derive(Clone)]
pub struct WasmProducer {
    module_path: PathBuf,
    registry: Arc<std::sync::Mutex<Registry>>,
    runtime: Arc<std::sync::Mutex<Option<Arc<WasmRuntime>>>>,
    config: crate::config::WasmConfig,
    state_store: crate::state_store::StateStore,
}

impl WasmProducer {
    pub fn new(
        module_path: PathBuf,
        registry: Arc<std::sync::Mutex<Registry>>,
        config: crate::config::WasmConfig,
    ) -> Self {
        Self {
            module_path,
            registry,
            runtime: Arc::new(std::sync::Mutex::new(None)),
            config,
            state_store: crate::state_store::StateStore::new(),
        }
    }

    pub fn config(&self) -> &crate::config::WasmConfig {
        &self.config
    }

    async fn ensure_runtime(&self) -> Result<Arc<WasmRuntime>, CamelError> {
        {
            let guard = self.runtime.lock().map_err(poisoned)?;
            if let Some(ref rt) = *guard {
                return Ok(Arc::clone(rt));
            }
        }

        let runtime = Arc::new(WasmRuntime::new(&self.module_path, self.config.clone()).await?);

        runtime
            .call_init_once(
                self.registry.clone(),
                HashMap::new(),
                self.state_store.clone(),
            )
            .await?;

        {
            let mut guard = self.runtime.lock().map_err(poisoned)?;
            if guard.is_none() {
                *guard = Some(Arc::clone(&runtime));
            }
        }

        Ok(runtime)
    }
}

impl Service<Exchange> for WasmProducer {
    type Response = Exchange;
    type Error = CamelError;
    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, exchange: Exchange) -> Self::Future {
        let this = self.clone();
        Box::pin(async move {
            let runtime = match this.ensure_runtime().await {
                Ok(rt) => rt,
                Err(e) => {
                    warn!(
                        module = %this.module_path.display(),
                        error = %e,
                        "Failed to initialize WASM runtime"
                    );
                    return Err(e);
                }
            };

            let wasm_exchange = exchange_to_wasm(&exchange);

            let result = runtime
                .call_process(
                    this.registry.clone(),
                    exchange.properties.clone(),
                    this.state_store.clone(),
                    wasm_exchange,
                )
                .await;

            match result {
                Ok(wasm_result) => {
                    let mut out = exchange;
                    wasm_to_exchange(wasm_result, &mut out);
                    debug!(
                        module = %this.module_path.display(),
                        "WASM producer completed successfully"
                    );
                    Ok(out)
                }
                Err(e) => {
                    warn!(
                        module = %this.module_path.display(),
                        error = %e,
                        "WASM guest error"
                    );
                    Err(e.into())
                }
            }
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::WasmConfig;

    #[test]
    fn test_producer_stores_config() {
        let config = WasmConfig {
            timeout_secs: 5,
            max_memory_bytes: 1024,
        };
        let producer = WasmProducer::new(
            PathBuf::from("test.wasm"),
            Arc::new(std::sync::Mutex::new(Registry::new())),
            config,
        );
        assert_eq!(producer.config().timeout_secs, 5);
        assert_eq!(producer.config().max_memory_bytes, 1024);
    }

    #[test]
    fn test_producer_is_clone() {
        let config = WasmConfig::default();
        let producer = WasmProducer::new(
            PathBuf::from("test.wasm"),
            Arc::new(std::sync::Mutex::new(Registry::new())),
            config,
        );
        let _cloned = producer.clone();
    }
}