camel_processor/
choice.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
9
10pub struct WhenClause {
12 pub predicate: FilterPredicate,
13 pub pipeline: BoxProcessor,
14}
15
16impl Clone for WhenClause {
17 fn clone(&self) -> Self {
18 Self {
19 predicate: self.predicate.clone(),
20 pipeline: self.pipeline.clone(),
21 }
22 }
23}
24
25#[derive(Clone)]
31pub struct ChoiceService {
32 whens: Vec<WhenClause>,
33 otherwise: Option<BoxProcessor>,
34}
35
36impl ChoiceService {
37 pub fn new(whens: Vec<WhenClause>, otherwise: Option<BoxProcessor>) -> Self {
42 Self { whens, otherwise }
43 }
44}
45
46impl Service<Exchange> for ChoiceService {
47 type Response = Exchange;
48 type Error = CamelError;
49 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
50
51 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
52 Poll::Ready(Ok(()))
53 }
54
55 fn call(&mut self, exchange: Exchange) -> Self::Future {
56 for when in &mut self.whens {
57 if (when.predicate)(&exchange) {
58 let fut = when.pipeline.call(exchange);
59 return Box::pin(fut);
60 }
61 }
62 if let Some(otherwise) = &mut self.otherwise {
63 let fut = otherwise.call(exchange);
64 return Box::pin(fut);
65 }
66 Box::pin(async move { Ok(exchange) })
67 }
68}
69
70pub struct WhenClauseSegment {
74 pub predicate: camel_api::FilterPredicate,
75 pub body: camel_api::OutcomeSegment,
76}
77
78impl Clone for WhenClauseSegment {
79 fn clone(&self) -> Self {
80 Self {
81 predicate: Arc::clone(&self.predicate),
82 body: self.body.clone(),
83 }
84 }
85}
86
87pub struct ChoiceSegment {
99 pub clauses: Vec<WhenClauseSegment>,
100 pub otherwise: Option<camel_api::OutcomeSegment>,
101}
102
103impl Clone for ChoiceSegment {
104 fn clone(&self) -> Self {
105 Self {
106 clauses: self
107 .clauses
108 .iter()
109 .map(|c| WhenClauseSegment {
110 predicate: Arc::clone(&c.predicate),
111 body: c.body.clone(),
112 })
113 .collect(),
114 otherwise: self.otherwise.clone(),
115 }
116 }
117}
118
119impl camel_api::OutcomePipeline for ChoiceSegment {
120 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
121 Box::new(self.clone())
122 }
123
124 fn run<'a>(
125 &'a mut self,
126 exchange: camel_api::Exchange,
127 ) -> Pin<Box<dyn Future<Output = camel_api::PipelineOutcome> + Send + 'a>> {
128 Box::pin(async move {
129 for clause in self.clauses.iter_mut() {
130 if (clause.predicate)(&exchange) {
131 return clause.body.run(exchange).await;
132 }
133 }
134 if let Some(otherwise) = self.otherwise.as_mut() {
135 otherwise.run(exchange).await
136 } else {
137 camel_api::PipelineOutcome::Completed(exchange)
138 }
139 })
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use camel_api::{Body, BoxProcessorExt, Message, Value};
147 use std::sync::Arc;
148 use tower::ServiceExt;
149
150 fn append_body(suffix: &'static str) -> BoxProcessor {
151 BoxProcessor::from_fn(move |mut ex: Exchange| {
152 Box::pin(async move {
153 if let Body::Text(s) = &ex.input.body {
154 ex.input.body = Body::Text(format!("{s}{suffix}"));
155 }
156 Ok(ex)
157 })
158 })
159 }
160
161 fn failing() -> BoxProcessor {
162 BoxProcessor::from_fn(|_ex| {
163 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
164 })
165 }
166
167 fn pred_header(name: &'static str) -> FilterPredicate {
168 Arc::new(move |ex: &Exchange| ex.input.header(name).is_some())
169 }
170
171 #[tokio::test]
173 async fn test_choice_first_when_matches() {
174 let whens = vec![
175 WhenClause {
176 predicate: pred_header("a"),
177 pipeline: append_body("-A"),
178 },
179 WhenClause {
180 predicate: pred_header("b"),
181 pipeline: append_body("-B"),
182 },
183 ];
184 let mut svc = ChoiceService::new(whens, None);
185 let mut ex = Exchange::new(Message::new("x"));
186 ex.input.set_header("a", Value::Bool(true));
187 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
188 assert_eq!(result.input.body.as_text(), Some("x-A"));
189 }
190
191 #[tokio::test]
193 async fn test_choice_second_when_matches() {
194 let whens = vec![
195 WhenClause {
196 predicate: pred_header("a"),
197 pipeline: append_body("-A"),
198 },
199 WhenClause {
200 predicate: pred_header("b"),
201 pipeline: append_body("-B"),
202 },
203 ];
204 let mut svc = ChoiceService::new(whens, None);
205 let mut ex = Exchange::new(Message::new("x"));
206 ex.input.set_header("b", Value::Bool(true));
207 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
208 assert_eq!(result.input.body.as_text(), Some("x-B"));
209 }
210
211 #[tokio::test]
213 async fn test_choice_short_circuits_at_first_match() {
214 let whens = vec![
215 WhenClause {
216 predicate: pred_header("a"),
217 pipeline: append_body("-A"),
218 },
219 WhenClause {
220 predicate: pred_header("b"),
221 pipeline: append_body("-B"),
222 },
223 ];
224 let mut svc = ChoiceService::new(whens, None);
225 let mut ex = Exchange::new(Message::new("x"));
226 ex.input.set_header("a", Value::Bool(true));
227 ex.input.set_header("b", Value::Bool(true));
228 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
229 assert_eq!(result.input.body.as_text(), Some("x-A"));
230 }
231
232 #[tokio::test]
234 async fn test_choice_otherwise_fires_when_no_when_matches() {
235 let whens = vec![WhenClause {
236 predicate: pred_header("a"),
237 pipeline: append_body("-A"),
238 }];
239 let mut svc = ChoiceService::new(whens, Some(append_body("-else")));
240 let ex = Exchange::new(Message::new("x"));
241 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
242 assert_eq!(result.input.body.as_text(), Some("x-else"));
243 }
244
245 #[tokio::test]
247 async fn test_choice_no_match_no_otherwise_passthrough() {
248 let whens = vec![WhenClause {
249 predicate: pred_header("a"),
250 pipeline: append_body("-A"),
251 }];
252 let mut svc = ChoiceService::new(whens, None);
253 let ex = Exchange::new(Message::new("untouched"));
254 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
255 assert_eq!(result.input.body.as_text(), Some("untouched"));
256 }
257
258 #[tokio::test]
260 async fn test_choice_error_in_when_propagates() {
261 let whens = vec![WhenClause {
262 predicate: pred_header("a"),
263 pipeline: failing(),
264 }];
265 let mut svc = ChoiceService::new(whens, None);
266 let mut ex = Exchange::new(Message::new("x"));
267 ex.input.set_header("a", Value::Bool(true));
268 let result = svc.ready().await.unwrap().call(ex).await;
269 assert!(result.is_err());
270 assert!(result.unwrap_err().to_string().contains("boom"));
271 }
272
273 #[tokio::test]
275 async fn test_choice_error_in_otherwise_propagates() {
276 let mut svc = ChoiceService::new(vec![], Some(failing()));
277 let ex = Exchange::new(Message::new("x"));
278 let result = svc.ready().await.unwrap().call(ex).await;
279 assert!(result.is_err());
280 }
281}