Skip to main content

camel_processor/
script_mutator.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::CamelError;
9use camel_api::exchange::Exchange;
10use camel_language_api::MutatingExpression;
11
12/// Processor that executes a mutating expression, allowing scripts to modify the Exchange.
13/// Uses `Arc<dyn MutatingExpression>` to enable `Clone` (required by `BoxProcessor`).
14#[derive(Clone)]
15pub struct ScriptMutator {
16    expression: Arc<dyn MutatingExpression>,
17}
18
19impl ScriptMutator {
20    pub fn new(expression: Box<dyn MutatingExpression>) -> Self {
21        Self {
22            expression: expression.into(),
23        }
24    }
25}
26
27impl Service<Exchange> for ScriptMutator {
28    type Response = Exchange;
29    type Error = CamelError;
30    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
31
32    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33        Poll::Ready(Ok(()))
34    }
35
36    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
37        let result = self.expression.evaluate(&mut exchange);
38        Box::pin(async move { result.map(|_| exchange).map_err(language_err_to_camel) })
39    }
40}
41
42fn language_err_to_camel(e: camel_language_api::LanguageError) -> CamelError {
43    use camel_language_api::LanguageError;
44    match e {
45        LanguageError::EvalError(msg) => CamelError::ProcessorError(msg),
46        LanguageError::ParseError { expr, reason } => {
47            CamelError::ProcessorError(format!("parse error in `{expr}`: {reason}"))
48        }
49        LanguageError::NotSupported { feature, language } => CamelError::ProcessorError(format!(
50            "feature '{feature}' not supported by language '{language}'"
51        )),
52        other => CamelError::ProcessorError(other.to_string()),
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use camel_api::{CamelError, Exchange, Message, Value};
59    use camel_language_api::LanguageError;
60    use tower::ServiceExt;
61
62    use super::*;
63
64    /// A simple test mutating expression that sets a header
65    struct TestMutatingExpression;
66
67    struct ParseErrorMutatingExpression;
68    struct NotSupportedMutatingExpression;
69    struct UnknownVariableMutatingExpression;
70
71    impl MutatingExpression for TestMutatingExpression {
72        fn evaluate(&self, exchange: &mut Exchange) -> Result<Value, LanguageError> {
73            exchange
74                .input
75                .headers
76                .insert("mutated".into(), Value::Bool(true));
77            Ok(Value::Null)
78        }
79    }
80
81    impl MutatingExpression for ParseErrorMutatingExpression {
82        fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
83            Err(LanguageError::ParseError {
84                expr: "x".to_string(),
85                reason: "bad".to_string(),
86            })
87        }
88    }
89
90    impl MutatingExpression for NotSupportedMutatingExpression {
91        fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
92            Err(LanguageError::NotSupported {
93                feature: "f".to_string(),
94                language: "l".to_string(),
95            })
96        }
97    }
98
99    impl MutatingExpression for UnknownVariableMutatingExpression {
100        fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
101            Err(LanguageError::UnknownVariable("foo".to_string()))
102        }
103    }
104
105    #[tokio::test]
106    async fn test_script_mutator_modifies_exchange() {
107        let exchange = Exchange::new(Message::new("test"));
108
109        let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
110
111        let result = mutator.oneshot(exchange).await.unwrap();
112        assert_eq!(result.input.header("mutated"), Some(&Value::Bool(true)));
113    }
114
115    #[tokio::test]
116    async fn test_script_mutator_preserves_body() {
117        let exchange = Exchange::new(Message::new("original body"));
118
119        let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
120
121        let result = mutator.oneshot(exchange).await.unwrap();
122        assert_eq!(result.input.body.as_text(), Some("original body"));
123    }
124
125    #[tokio::test]
126    async fn test_script_mutator_is_clone() {
127        let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
128        let _cloned = mutator.clone();
129    }
130
131    #[tokio::test]
132    async fn test_script_mutator_maps_parse_error() {
133        let exchange = Exchange::new(Message::new("test"));
134        let mutator = ScriptMutator::new(Box::new(ParseErrorMutatingExpression));
135        let result = mutator.oneshot(exchange).await;
136        assert!(
137            matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "parse error in `x`: bad")
138        );
139    }
140
141    #[tokio::test]
142    async fn test_script_mutator_maps_not_supported_error() {
143        let exchange = Exchange::new(Message::new("test"));
144        let mutator = ScriptMutator::new(Box::new(NotSupportedMutatingExpression));
145        let result = mutator.oneshot(exchange).await;
146        assert!(
147            matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "feature 'f' not supported by language 'l'")
148        );
149    }
150
151    #[tokio::test]
152    async fn test_script_mutator_maps_other_language_error() {
153        let exchange = Exchange::new(Message::new("test"));
154        let mutator = ScriptMutator::new(Box::new(UnknownVariableMutatingExpression));
155        let result = mutator.oneshot(exchange).await;
156        assert!(
157            matches!(result, Err(CamelError::ProcessorError(msg)) if msg.contains("unknown variable: foo"))
158        );
159    }
160}