camel_processor/
choice.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
8
9pub struct WhenClause {
11 pub predicate: FilterPredicate,
12 pub pipeline: BoxProcessor,
13}
14
15impl Clone for WhenClause {
16 fn clone(&self) -> Self {
17 Self {
18 predicate: self.predicate.clone(),
19 pipeline: self.pipeline.clone(),
20 }
21 }
22}
23
24#[derive(Clone)]
30pub struct ChoiceService {
31 whens: Vec<WhenClause>,
32 otherwise: Option<BoxProcessor>,
33}
34
35impl ChoiceService {
36 pub fn new(whens: Vec<WhenClause>, otherwise: Option<BoxProcessor>) -> Self {
41 Self { whens, otherwise }
42 }
43}
44
45impl Service<Exchange> for ChoiceService {
46 type Response = Exchange;
47 type Error = CamelError;
48 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
49
50 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51 Poll::Ready(Ok(()))
52 }
53
54 fn call(&mut self, exchange: Exchange) -> Self::Future {
55 for when in &mut self.whens {
56 if (when.predicate)(&exchange) {
57 let fut = when.pipeline.call(exchange);
58 return Box::pin(fut);
59 }
60 }
61 if let Some(otherwise) = &mut self.otherwise {
62 let fut = otherwise.call(exchange);
63 return Box::pin(fut);
64 }
65 Box::pin(async move { Ok(exchange) })
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use super::*;
72 use camel_api::{Body, BoxProcessorExt, Message, Value};
73 use std::sync::Arc;
74 use tower::ServiceExt;
75
76 fn append_body(suffix: &'static str) -> BoxProcessor {
77 BoxProcessor::from_fn(move |mut ex: Exchange| {
78 Box::pin(async move {
79 if let Body::Text(s) = &ex.input.body {
80 ex.input.body = Body::Text(format!("{s}{suffix}"));
81 }
82 Ok(ex)
83 })
84 })
85 }
86
87 fn failing() -> BoxProcessor {
88 BoxProcessor::from_fn(|_ex| {
89 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
90 })
91 }
92
93 fn pred_header(name: &'static str) -> FilterPredicate {
94 Arc::new(move |ex: &Exchange| ex.input.header(name).is_some())
95 }
96
97 #[tokio::test]
99 async fn test_choice_first_when_matches() {
100 let whens = vec![
101 WhenClause {
102 predicate: pred_header("a"),
103 pipeline: append_body("-A"),
104 },
105 WhenClause {
106 predicate: pred_header("b"),
107 pipeline: append_body("-B"),
108 },
109 ];
110 let mut svc = ChoiceService::new(whens, None);
111 let mut ex = Exchange::new(Message::new("x"));
112 ex.input.set_header("a", Value::Bool(true));
113 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
114 assert_eq!(result.input.body.as_text(), Some("x-A"));
115 }
116
117 #[tokio::test]
119 async fn test_choice_second_when_matches() {
120 let whens = vec![
121 WhenClause {
122 predicate: pred_header("a"),
123 pipeline: append_body("-A"),
124 },
125 WhenClause {
126 predicate: pred_header("b"),
127 pipeline: append_body("-B"),
128 },
129 ];
130 let mut svc = ChoiceService::new(whens, None);
131 let mut ex = Exchange::new(Message::new("x"));
132 ex.input.set_header("b", Value::Bool(true));
133 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
134 assert_eq!(result.input.body.as_text(), Some("x-B"));
135 }
136
137 #[tokio::test]
139 async fn test_choice_short_circuits_at_first_match() {
140 let whens = vec![
141 WhenClause {
142 predicate: pred_header("a"),
143 pipeline: append_body("-A"),
144 },
145 WhenClause {
146 predicate: pred_header("b"),
147 pipeline: append_body("-B"),
148 },
149 ];
150 let mut svc = ChoiceService::new(whens, None);
151 let mut ex = Exchange::new(Message::new("x"));
152 ex.input.set_header("a", Value::Bool(true));
153 ex.input.set_header("b", Value::Bool(true));
154 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
155 assert_eq!(result.input.body.as_text(), Some("x-A"));
156 }
157
158 #[tokio::test]
160 async fn test_choice_otherwise_fires_when_no_when_matches() {
161 let whens = vec![WhenClause {
162 predicate: pred_header("a"),
163 pipeline: append_body("-A"),
164 }];
165 let mut svc = ChoiceService::new(whens, Some(append_body("-else")));
166 let ex = Exchange::new(Message::new("x"));
167 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
168 assert_eq!(result.input.body.as_text(), Some("x-else"));
169 }
170
171 #[tokio::test]
173 async fn test_choice_no_match_no_otherwise_passthrough() {
174 let whens = vec![WhenClause {
175 predicate: pred_header("a"),
176 pipeline: append_body("-A"),
177 }];
178 let mut svc = ChoiceService::new(whens, None);
179 let ex = Exchange::new(Message::new("untouched"));
180 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
181 assert_eq!(result.input.body.as_text(), Some("untouched"));
182 }
183
184 #[tokio::test]
186 async fn test_choice_error_in_when_propagates() {
187 let whens = vec![WhenClause {
188 predicate: pred_header("a"),
189 pipeline: failing(),
190 }];
191 let mut svc = ChoiceService::new(whens, None);
192 let mut ex = Exchange::new(Message::new("x"));
193 ex.input.set_header("a", Value::Bool(true));
194 let result = svc.ready().await.unwrap().call(ex).await;
195 assert!(result.is_err());
196 assert!(result.unwrap_err().to_string().contains("boom"));
197 }
198
199 #[tokio::test]
201 async fn test_choice_error_in_otherwise_propagates() {
202 let mut svc = ChoiceService::new(vec![], Some(failing()));
203 let ex = Exchange::new(Message::new("x"));
204 let result = svc.ready().await.unwrap().call(ex).await;
205 assert!(result.is_err());
206 }
207}