Skip to main content

camel_processor/
script_mutator.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::CamelError;
9use camel_api::exchange::Exchange;
10use camel_language_api::MutatingExpression;
11
12/// Processor that executes a mutating expression, allowing scripts to modify the Exchange.
13/// Uses `Arc<dyn MutatingExpression>` to enable `Clone` (required by `BoxProcessor`).
14#[derive(Clone)]
15pub struct ScriptMutator {
16    expression: Arc<dyn MutatingExpression>,
17}
18
19impl ScriptMutator {
20    pub fn new(expression: Box<dyn MutatingExpression>) -> Self {
21        Self {
22            expression: expression.into(),
23        }
24    }
25}
26
27impl Service<Exchange> for ScriptMutator {
28    type Response = Exchange;
29    type Error = CamelError;
30    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
31
32    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33        Poll::Ready(Ok(()))
34    }
35
36    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
37        let expression = self.expression.clone();
38        Box::pin(async move {
39            let result = expression.evaluate(&mut exchange).await;
40            result.map(|_| exchange).map_err(language_err_to_camel)
41        })
42    }
43}
44
45fn language_err_to_camel(e: camel_language_api::LanguageError) -> CamelError {
46    use camel_language_api::LanguageError;
47    match e {
48        LanguageError::EvalError(msg) => CamelError::ProcessorError(msg),
49        LanguageError::ParseError { expr, reason } => {
50            CamelError::ProcessorError(format!("parse error in `{expr}`: {reason}"))
51        }
52        LanguageError::NotSupported { feature, language } => CamelError::ProcessorError(format!(
53            "feature '{feature}' not supported by language '{language}'"
54        )),
55        other => CamelError::ProcessorError(other.to_string()),
56    }
57}
58
59#[cfg(test)]
60mod tests {
61    use camel_api::{CamelError, Exchange, Message, Value};
62    use camel_language_api::LanguageError;
63    use tower::ServiceExt;
64
65    use super::*;
66
67    /// A simple test mutating expression that sets a header
68    struct TestMutatingExpression;
69
70    struct ParseErrorMutatingExpression;
71    struct NotSupportedMutatingExpression;
72    struct UnknownVariableMutatingExpression;
73
74    #[async_trait::async_trait]
75    impl MutatingExpression for TestMutatingExpression {
76        async fn evaluate(&self, exchange: &mut Exchange) -> Result<Value, LanguageError> {
77            exchange
78                .input
79                .headers
80                .insert("mutated".into(), Value::Bool(true));
81            Ok(Value::Null)
82        }
83    }
84
85    #[async_trait::async_trait]
86    impl MutatingExpression for ParseErrorMutatingExpression {
87        async fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
88            Err(LanguageError::ParseError {
89                expr: "x".to_string(),
90                reason: "bad".to_string(),
91            })
92        }
93    }
94
95    #[async_trait::async_trait]
96    impl MutatingExpression for NotSupportedMutatingExpression {
97        async fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
98            Err(LanguageError::NotSupported {
99                feature: "f".to_string(),
100                language: "l".to_string(),
101            })
102        }
103    }
104
105    #[async_trait::async_trait]
106    impl MutatingExpression for UnknownVariableMutatingExpression {
107        async fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
108            Err(LanguageError::UnknownVariable("foo".to_string()))
109        }
110    }
111
112    #[tokio::test]
113    async fn test_script_mutator_modifies_exchange() {
114        let exchange = Exchange::new(Message::new("test"));
115
116        let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
117
118        let result = mutator.oneshot(exchange).await.unwrap();
119        assert_eq!(result.input.header("mutated"), Some(&Value::Bool(true)));
120    }
121
122    #[tokio::test]
123    async fn test_script_mutator_preserves_body() {
124        let exchange = Exchange::new(Message::new("original body"));
125
126        let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
127
128        let result = mutator.oneshot(exchange).await.unwrap();
129        assert_eq!(result.input.body.as_text(), Some("original body"));
130    }
131
132    #[tokio::test]
133    async fn test_script_mutator_is_clone() {
134        let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
135        let _cloned = mutator.clone();
136    }
137
138    #[tokio::test]
139    async fn test_script_mutator_maps_parse_error() {
140        let exchange = Exchange::new(Message::new("test"));
141        let mutator = ScriptMutator::new(Box::new(ParseErrorMutatingExpression));
142        let result = mutator.oneshot(exchange).await;
143        assert!(
144            matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "parse error in `x`: bad")
145        );
146    }
147
148    #[tokio::test]
149    async fn test_script_mutator_maps_not_supported_error() {
150        let exchange = Exchange::new(Message::new("test"));
151        let mutator = ScriptMutator::new(Box::new(NotSupportedMutatingExpression));
152        let result = mutator.oneshot(exchange).await;
153        assert!(
154            matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "feature 'f' not supported by language 'l'")
155        );
156    }
157
158    #[tokio::test]
159    async fn test_script_mutator_maps_other_language_error() {
160        let exchange = Exchange::new(Message::new("test"));
161        let mutator = ScriptMutator::new(Box::new(UnknownVariableMutatingExpression));
162        let result = mutator.oneshot(exchange).await;
163        assert!(
164            matches!(result, Err(CamelError::ProcessorError(msg)) if msg.contains("unknown variable: foo"))
165        );
166    }
167}