camel_processor/
script_mutator.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::CamelError;
9use camel_api::exchange::Exchange;
10use camel_language_api::MutatingExpression;
11
12#[derive(Clone)]
15pub struct ScriptMutator {
16 expression: Arc<dyn MutatingExpression>,
17}
18
19impl ScriptMutator {
20 pub fn new(expression: Box<dyn MutatingExpression>) -> Self {
21 Self {
22 expression: expression.into(),
23 }
24 }
25}
26
27impl Service<Exchange> for ScriptMutator {
28 type Response = Exchange;
29 type Error = CamelError;
30 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
31
32 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33 Poll::Ready(Ok(()))
34 }
35
36 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
37 let expression = self.expression.clone();
38 Box::pin(async move {
39 let result = expression.evaluate(&mut exchange).await;
40 result.map(|_| exchange).map_err(language_err_to_camel)
41 })
42 }
43}
44
45fn language_err_to_camel(e: camel_language_api::LanguageError) -> CamelError {
46 use camel_language_api::LanguageError;
47 match e {
48 LanguageError::EvalError(msg) => CamelError::ProcessorError(msg),
49 LanguageError::ParseError { expr, reason } => {
50 CamelError::ProcessorError(format!("parse error in `{expr}`: {reason}"))
51 }
52 LanguageError::NotSupported { feature, language } => CamelError::ProcessorError(format!(
53 "feature '{feature}' not supported by language '{language}'"
54 )),
55 other => CamelError::ProcessorError(other.to_string()),
56 }
57}
58
59#[cfg(test)]
60mod tests {
61 use camel_api::{CamelError, Exchange, Message, Value};
62 use camel_language_api::LanguageError;
63 use tower::ServiceExt;
64
65 use super::*;
66
67 struct TestMutatingExpression;
69
70 struct ParseErrorMutatingExpression;
71 struct NotSupportedMutatingExpression;
72 struct UnknownVariableMutatingExpression;
73
74 #[async_trait::async_trait]
75 impl MutatingExpression for TestMutatingExpression {
76 async fn evaluate(&self, exchange: &mut Exchange) -> Result<Value, LanguageError> {
77 exchange
78 .input
79 .headers
80 .insert("mutated".into(), Value::Bool(true));
81 Ok(Value::Null)
82 }
83 }
84
85 #[async_trait::async_trait]
86 impl MutatingExpression for ParseErrorMutatingExpression {
87 async fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
88 Err(LanguageError::ParseError {
89 expr: "x".to_string(),
90 reason: "bad".to_string(),
91 })
92 }
93 }
94
95 #[async_trait::async_trait]
96 impl MutatingExpression for NotSupportedMutatingExpression {
97 async fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
98 Err(LanguageError::NotSupported {
99 feature: "f".to_string(),
100 language: "l".to_string(),
101 })
102 }
103 }
104
105 #[async_trait::async_trait]
106 impl MutatingExpression for UnknownVariableMutatingExpression {
107 async fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
108 Err(LanguageError::UnknownVariable("foo".to_string()))
109 }
110 }
111
112 #[tokio::test]
113 async fn test_script_mutator_modifies_exchange() {
114 let exchange = Exchange::new(Message::new("test"));
115
116 let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
117
118 let result = mutator.oneshot(exchange).await.unwrap();
119 assert_eq!(result.input.header("mutated"), Some(&Value::Bool(true)));
120 }
121
122 #[tokio::test]
123 async fn test_script_mutator_preserves_body() {
124 let exchange = Exchange::new(Message::new("original body"));
125
126 let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
127
128 let result = mutator.oneshot(exchange).await.unwrap();
129 assert_eq!(result.input.body.as_text(), Some("original body"));
130 }
131
132 #[tokio::test]
133 async fn test_script_mutator_is_clone() {
134 let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
135 let _cloned = mutator.clone();
136 }
137
138 #[tokio::test]
139 async fn test_script_mutator_maps_parse_error() {
140 let exchange = Exchange::new(Message::new("test"));
141 let mutator = ScriptMutator::new(Box::new(ParseErrorMutatingExpression));
142 let result = mutator.oneshot(exchange).await;
143 assert!(
144 matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "parse error in `x`: bad")
145 );
146 }
147
148 #[tokio::test]
149 async fn test_script_mutator_maps_not_supported_error() {
150 let exchange = Exchange::new(Message::new("test"));
151 let mutator = ScriptMutator::new(Box::new(NotSupportedMutatingExpression));
152 let result = mutator.oneshot(exchange).await;
153 assert!(
154 matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "feature 'f' not supported by language 'l'")
155 );
156 }
157
158 #[tokio::test]
159 async fn test_script_mutator_maps_other_language_error() {
160 let exchange = Exchange::new(Message::new("test"));
161 let mutator = ScriptMutator::new(Box::new(UnknownVariableMutatingExpression));
162 let result = mutator.oneshot(exchange).await;
163 assert!(
164 matches!(result, Err(CamelError::ProcessorError(msg)) if msg.contains("unknown variable: foo"))
165 );
166 }
167}