camel_processor/
filter.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
9
10#[derive(Clone)]
15pub struct FilterService {
16 predicate: FilterPredicate,
17 sub_pipeline: BoxProcessor,
18}
19
20impl FilterService {
21 pub fn new(
23 predicate: impl Fn(&Exchange) -> bool + Send + Sync + 'static,
24 sub_pipeline: BoxProcessor,
25 ) -> Self {
26 Self {
27 predicate: Arc::new(predicate),
28 sub_pipeline,
29 }
30 }
31
32 pub fn from_predicate(predicate: FilterPredicate, sub_pipeline: BoxProcessor) -> Self {
34 Self {
35 predicate,
36 sub_pipeline,
37 }
38 }
39}
40
41impl Service<Exchange> for FilterService {
42 type Response = Exchange;
43 type Error = CamelError;
44 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
45
46 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47 self.sub_pipeline.poll_ready(cx)
48 }
49
50 fn call(&mut self, exchange: Exchange) -> Self::Future {
51 if (self.predicate)(&exchange) {
52 let fut = self.sub_pipeline.call(exchange);
53 Box::pin(fut)
54 } else {
55 Box::pin(async move { Ok(exchange) })
56 }
57 }
58}
59
60pub struct FilterSegment {
74 pub predicate: camel_api::FilterPredicate,
75 pub body: camel_api::OutcomeSegment,
76}
77
78impl Clone for FilterSegment {
79 fn clone(&self) -> Self {
80 Self {
81 predicate: Arc::clone(&self.predicate),
82 body: self.body.clone(),
83 }
84 }
85}
86
87impl camel_api::OutcomePipeline for FilterSegment {
88 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
89 Box::new(self.clone())
90 }
91
92 fn run<'a>(
93 &'a mut self,
94 exchange: camel_api::Exchange,
95 ) -> Pin<Box<dyn Future<Output = camel_api::PipelineOutcome> + Send + 'a>> {
96 Box::pin(async move {
97 if (self.predicate)(&exchange) {
98 self.body.run(exchange).await
99 } else {
100 camel_api::PipelineOutcome::Completed(exchange)
101 }
102 })
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use camel_api::{Body, BoxProcessorExt, Message, Value};
110 use tower::ServiceExt;
111
112 fn passthrough() -> BoxProcessor {
113 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
114 }
115
116 fn uppercase_body() -> BoxProcessor {
117 BoxProcessor::from_fn(|mut ex: Exchange| {
118 Box::pin(async move {
119 if let Body::Text(s) = &ex.input.body {
120 ex.input.body = Body::Text(s.to_uppercase());
121 }
122 Ok(ex)
123 })
124 })
125 }
126
127 fn failing() -> BoxProcessor {
128 BoxProcessor::from_fn(|_ex| {
129 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
130 })
131 }
132
133 #[tokio::test]
135 async fn test_filter_passes_matching_exchange() {
136 let mut svc = FilterService::new(
137 |ex: &Exchange| ex.input.header("active").is_some(),
138 uppercase_body(),
139 );
140 let mut ex = Exchange::new(Message::new("hello"));
141 ex.input.set_header("active", Value::Bool(true));
142 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
143 assert_eq!(result.input.body.as_text(), Some("HELLO"));
144 }
145
146 #[tokio::test]
148 async fn test_filter_blocks_non_matching_exchange() {
149 let mut svc = FilterService::new(
150 |ex: &Exchange| ex.input.header("active").is_some(),
151 uppercase_body(),
152 );
153 let ex = Exchange::new(Message::new("hello"));
154 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
155 assert_eq!(result.input.body.as_text(), Some("hello"));
157 }
158
159 #[tokio::test]
161 async fn test_filter_sub_pipeline_transforms_body() {
162 let mut svc = FilterService::new(|_: &Exchange| true, uppercase_body());
163 let ex = Exchange::new(Message::new("world"));
164 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
165 assert_eq!(result.input.body.as_text(), Some("WORLD"));
166 }
167
168 #[tokio::test]
170 async fn test_filter_sub_pipeline_error_propagates() {
171 let mut svc = FilterService::new(|_: &Exchange| true, failing());
172 let ex = Exchange::new(Message::new("x"));
173 let result = svc.ready().await.unwrap().call(ex).await;
174 assert!(result.is_err());
175 assert!(result.unwrap_err().to_string().contains("boom"));
176 }
177
178 #[tokio::test]
180 async fn test_filter_predicate_receives_original_exchange() {
181 let mut svc = FilterService::new(
182 |ex: &Exchange| ex.input.body.as_text() == Some("check"),
183 uppercase_body(),
184 );
185 let ex = Exchange::new(Message::new("check"));
186 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
187 assert_eq!(result.input.body.as_text(), Some("CHECK"));
188 }
189
190 #[tokio::test]
192 async fn test_filter_clone_is_independent() {
193 let svc = FilterService::new(|_: &Exchange| true, passthrough());
194 let mut clone = svc.clone();
195 let ex = Exchange::new(Message::new("hi"));
196 let result = clone.ready().await.unwrap().call(ex).await.unwrap();
197 assert_eq!(result.input.body.as_text(), Some("hi"));
198 }
199}