camel_processor/
script_mutator.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::CamelError;
9use camel_api::exchange::Exchange;
10use camel_language_api::MutatingExpression;
11
12#[derive(Clone)]
15pub struct ScriptMutator {
16 expression: Arc<dyn MutatingExpression>,
17}
18
19impl ScriptMutator {
20 pub fn new(expression: Box<dyn MutatingExpression>) -> Self {
21 Self {
22 expression: expression.into(),
23 }
24 }
25}
26
27impl Service<Exchange> for ScriptMutator {
28 type Response = Exchange;
29 type Error = CamelError;
30 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
31
32 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33 Poll::Ready(Ok(()))
34 }
35
36 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
37 let result = self.expression.evaluate(&mut exchange);
38 Box::pin(async move { result.map(|_| exchange).map_err(language_err_to_camel) })
39 }
40}
41
42fn language_err_to_camel(e: camel_language_api::LanguageError) -> CamelError {
43 use camel_language_api::LanguageError;
44 match e {
45 LanguageError::EvalError(msg) => CamelError::ProcessorError(msg),
46 LanguageError::ParseError { expr, reason } => {
47 CamelError::ProcessorError(format!("parse error in `{expr}`: {reason}"))
48 }
49 LanguageError::NotSupported { feature, language } => CamelError::ProcessorError(format!(
50 "feature '{feature}' not supported by language '{language}'"
51 )),
52 other => CamelError::ProcessorError(other.to_string()),
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use camel_api::{Exchange, Message, Value};
59 use camel_language_api::LanguageError;
60 use tower::ServiceExt;
61
62 use super::*;
63
64 struct TestMutatingExpression;
66
67 impl MutatingExpression for TestMutatingExpression {
68 fn evaluate(&self, exchange: &mut Exchange) -> Result<Value, LanguageError> {
69 exchange
70 .input
71 .headers
72 .insert("mutated".into(), Value::Bool(true));
73 Ok(Value::Null)
74 }
75 }
76
77 #[tokio::test]
78 async fn test_script_mutator_modifies_exchange() {
79 let exchange = Exchange::new(Message::new("test"));
80
81 let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
82
83 let result = mutator.oneshot(exchange).await.unwrap();
84 assert_eq!(result.input.header("mutated"), Some(&Value::Bool(true)));
85 }
86
87 #[tokio::test]
88 async fn test_script_mutator_preserves_body() {
89 let exchange = Exchange::new(Message::new("original body"));
90
91 let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
92
93 let result = mutator.oneshot(exchange).await.unwrap();
94 assert_eq!(result.input.body.as_text(), Some("original body"));
95 }
96
97 #[tokio::test]
98 async fn test_script_mutator_is_clone() {
99 let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
100 let _cloned = mutator.clone();
101 }
103}