camel_processor/
script_mutator.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::CamelError;
9use camel_api::exchange::Exchange;
10use camel_language_api::MutatingExpression;
11
12#[derive(Clone)]
15pub struct ScriptMutator {
16 expression: Arc<dyn MutatingExpression>,
17}
18
19impl ScriptMutator {
20 pub fn new(expression: Box<dyn MutatingExpression>) -> Self {
21 Self {
22 expression: expression.into(),
23 }
24 }
25}
26
27impl Service<Exchange> for ScriptMutator {
28 type Response = Exchange;
29 type Error = CamelError;
30 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
31
32 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33 Poll::Ready(Ok(()))
34 }
35
36 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
37 let result = self.expression.evaluate(&mut exchange);
38 Box::pin(async move { result.map(|_| exchange).map_err(language_err_to_camel) })
39 }
40}
41
42fn language_err_to_camel(e: camel_language_api::LanguageError) -> CamelError {
43 use camel_language_api::LanguageError;
44 match e {
45 LanguageError::EvalError(msg) => CamelError::ProcessorError(msg),
46 LanguageError::ParseError { expr, reason } => {
47 CamelError::ProcessorError(format!("parse error in `{expr}`: {reason}"))
48 }
49 LanguageError::NotSupported { feature, language } => CamelError::ProcessorError(format!(
50 "feature '{feature}' not supported by language '{language}'"
51 )),
52 other => CamelError::ProcessorError(other.to_string()),
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use camel_api::{CamelError, Exchange, Message, Value};
59 use camel_language_api::LanguageError;
60 use tower::ServiceExt;
61
62 use super::*;
63
64 struct TestMutatingExpression;
66
67 struct ParseErrorMutatingExpression;
68 struct NotSupportedMutatingExpression;
69 struct UnknownVariableMutatingExpression;
70
71 impl MutatingExpression for TestMutatingExpression {
72 fn evaluate(&self, exchange: &mut Exchange) -> Result<Value, LanguageError> {
73 exchange
74 .input
75 .headers
76 .insert("mutated".into(), Value::Bool(true));
77 Ok(Value::Null)
78 }
79 }
80
81 impl MutatingExpression for ParseErrorMutatingExpression {
82 fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
83 Err(LanguageError::ParseError {
84 expr: "x".to_string(),
85 reason: "bad".to_string(),
86 })
87 }
88 }
89
90 impl MutatingExpression for NotSupportedMutatingExpression {
91 fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
92 Err(LanguageError::NotSupported {
93 feature: "f".to_string(),
94 language: "l".to_string(),
95 })
96 }
97 }
98
99 impl MutatingExpression for UnknownVariableMutatingExpression {
100 fn evaluate(&self, _exchange: &mut Exchange) -> Result<Value, LanguageError> {
101 Err(LanguageError::UnknownVariable("foo".to_string()))
102 }
103 }
104
105 #[tokio::test]
106 async fn test_script_mutator_modifies_exchange() {
107 let exchange = Exchange::new(Message::new("test"));
108
109 let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
110
111 let result = mutator.oneshot(exchange).await.unwrap();
112 assert_eq!(result.input.header("mutated"), Some(&Value::Bool(true)));
113 }
114
115 #[tokio::test]
116 async fn test_script_mutator_preserves_body() {
117 let exchange = Exchange::new(Message::new("original body"));
118
119 let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
120
121 let result = mutator.oneshot(exchange).await.unwrap();
122 assert_eq!(result.input.body.as_text(), Some("original body"));
123 }
124
125 #[tokio::test]
126 async fn test_script_mutator_is_clone() {
127 let mutator = ScriptMutator::new(Box::new(TestMutatingExpression));
128 let _cloned = mutator.clone();
129 }
130
131 #[tokio::test]
132 async fn test_script_mutator_maps_parse_error() {
133 let exchange = Exchange::new(Message::new("test"));
134 let mutator = ScriptMutator::new(Box::new(ParseErrorMutatingExpression));
135 let result = mutator.oneshot(exchange).await;
136 assert!(
137 matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "parse error in `x`: bad")
138 );
139 }
140
141 #[tokio::test]
142 async fn test_script_mutator_maps_not_supported_error() {
143 let exchange = Exchange::new(Message::new("test"));
144 let mutator = ScriptMutator::new(Box::new(NotSupportedMutatingExpression));
145 let result = mutator.oneshot(exchange).await;
146 assert!(
147 matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "feature 'f' not supported by language 'l'")
148 );
149 }
150
151 #[tokio::test]
152 async fn test_script_mutator_maps_other_language_error() {
153 let exchange = Exchange::new(Message::new("test"));
154 let mutator = ScriptMutator::new(Box::new(UnknownVariableMutatingExpression));
155 let result = mutator.oneshot(exchange).await;
156 assert!(
157 matches!(result, Err(CamelError::ProcessorError(msg)) if msg.contains("unknown variable: foo"))
158 );
159 }
160}