Skip to main content

camel_processor/
script_mutator.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::CamelError;
9use camel_api::exchange::Exchange;
10use camel_language_api::MutatingExpression;
11
12/// Processor that executes a mutating expression, allowing scripts to modify the Exchange.
13/// Uses `Arc<dyn MutatingExpression>` to enable `Clone` (required by `BoxProcessor`).
14#[derive(Clone)]
15pub struct ScriptMutator {
16    expression: Arc<dyn MutatingExpression>,
17}
18
19impl ScriptMutator {
20    pub fn new(expression: Box<dyn MutatingExpression>) -> Self {
21        Self {
22            expression: expression.into(),
23        }
24    }
25}
26
27impl Service<Exchange> for ScriptMutator {
28    type Response = Exchange;
29    type Error = CamelError;
30    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
31
32    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33        Poll::Ready(Ok(()))
34    }
35
36    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
37        let result = self.expression.evaluate(&mut exchange);
38        Box::pin(async move { result.map(|_| exchange).map_err(language_err_to_camel) })
39    }
40}
41
42fn language_err_to_camel(e: camel_language_api::LanguageError) -> CamelError {
43    use camel_language_api::LanguageError;
44    match e {
45        LanguageError::EvalError(msg) => CamelError::ProcessorError(msg),
46        LanguageError::ParseError { expr, reason } => {
47            CamelError::ProcessorError(format!("parse error in `{expr}`: {reason}"))
48        }
49        LanguageError::NotSupported { feature, language } => CamelError::ProcessorError(format!(
50            "feature '{feature}' not supported by language '{language}'"
51        )),
52        other => CamelError::ProcessorError(other.to_string()),
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use camel_api::{Exchange, Message, Value};
59    use camel_language_api::LanguageError;
60    use tower::ServiceExt;
61
62    use super::*;
63
64    /// A simple test mutating expression that sets a header
65    struct TestMutatingExpression;
66
67    impl MutatingExpression for TestMutatingExpression {
68        fn evaluate(&self, exchange: &mut Exchange) -> Result<Value, LanguageError> {
69            exchange
70                .input
71                .headers
72                .insert("mutated".into(), Value::Bool(true));
73            Ok(Value::Null)
74        }
75    }
76
77    #[tokio::test]
78    async fn test_script_mutator_modifies_exchange() {
79        let exchange = Exchange::new(Message::new("test"));
80
81        let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
82
83        let result = mutator.oneshot(exchange).await.unwrap();
84        assert_eq!(result.input.header("mutated"), Some(&Value::Bool(true)));
85    }
86
87    #[tokio::test]
88    async fn test_script_mutator_preserves_body() {
89        let exchange = Exchange::new(Message::new("original body"));
90
91        let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
92
93        let result = mutator.oneshot(exchange).await.unwrap();
94        assert_eq!(result.input.body.as_text(), Some("original body"));
95    }
96
97    #[tokio::test]
98    async fn test_script_mutator_is_clone() {
99        let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
100        let _cloned = mutator.clone();
101        // If this compiles, Clone is implemented correctly via Arc
102    }
103}