camel_processor/
filter.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
9
10#[derive(Clone)]
15pub struct FilterService {
16 predicate: FilterPredicate,
17 sub_pipeline: BoxProcessor,
18}
19
20impl FilterService {
21 pub fn new(
23 predicate: impl Fn(&Exchange) -> bool + Send + Sync + 'static,
24 sub_pipeline: BoxProcessor,
25 ) -> Self {
26 Self {
27 predicate: Arc::new(predicate),
28 sub_pipeline,
29 }
30 }
31
32 pub fn from_predicate(predicate: FilterPredicate, sub_pipeline: BoxProcessor) -> Self {
34 Self {
35 predicate,
36 sub_pipeline,
37 }
38 }
39}
40
41impl Service<Exchange> for FilterService {
42 type Response = Exchange;
43 type Error = CamelError;
44 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
45
46 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47 self.sub_pipeline.poll_ready(cx)
48 }
49
50 fn call(&mut self, exchange: Exchange) -> Self::Future {
51 if (self.predicate)(&exchange) {
52 let fut = self.sub_pipeline.call(exchange);
53 Box::pin(fut)
54 } else {
55 Box::pin(async move { Ok(exchange) })
56 }
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use super::*;
63 use camel_api::{Body, BoxProcessorExt, Message, Value};
64 use tower::ServiceExt;
65
66 fn passthrough() -> BoxProcessor {
67 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
68 }
69
70 fn uppercase_body() -> BoxProcessor {
71 BoxProcessor::from_fn(|mut ex: Exchange| {
72 Box::pin(async move {
73 if let Body::Text(s) = &ex.input.body {
74 ex.input.body = Body::Text(s.to_uppercase());
75 }
76 Ok(ex)
77 })
78 })
79 }
80
81 fn failing() -> BoxProcessor {
82 BoxProcessor::from_fn(|_ex| {
83 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
84 })
85 }
86
87 #[tokio::test]
89 async fn test_filter_passes_matching_exchange() {
90 let mut svc = FilterService::new(
91 |ex: &Exchange| ex.input.header("active").is_some(),
92 uppercase_body(),
93 );
94 let mut ex = Exchange::new(Message::new("hello"));
95 ex.input.set_header("active", Value::Bool(true));
96 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
97 assert_eq!(result.input.body.as_text(), Some("HELLO"));
98 }
99
100 #[tokio::test]
102 async fn test_filter_blocks_non_matching_exchange() {
103 let mut svc = FilterService::new(
104 |ex: &Exchange| ex.input.header("active").is_some(),
105 uppercase_body(),
106 );
107 let ex = Exchange::new(Message::new("hello"));
108 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
109 assert_eq!(result.input.body.as_text(), Some("hello"));
111 }
112
113 #[tokio::test]
115 async fn test_filter_sub_pipeline_transforms_body() {
116 let mut svc = FilterService::new(|_: &Exchange| true, uppercase_body());
117 let ex = Exchange::new(Message::new("world"));
118 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
119 assert_eq!(result.input.body.as_text(), Some("WORLD"));
120 }
121
122 #[tokio::test]
124 async fn test_filter_sub_pipeline_error_propagates() {
125 let mut svc = FilterService::new(|_: &Exchange| true, failing());
126 let ex = Exchange::new(Message::new("x"));
127 let result = svc.ready().await.unwrap().call(ex).await;
128 assert!(result.is_err());
129 assert!(result.unwrap_err().to_string().contains("boom"));
130 }
131
132 #[tokio::test]
134 async fn test_filter_predicate_receives_original_exchange() {
135 let mut svc = FilterService::new(
136 |ex: &Exchange| ex.input.body.as_text() == Some("check"),
137 uppercase_body(),
138 );
139 let ex = Exchange::new(Message::new("check"));
140 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
141 assert_eq!(result.input.body.as_text(), Some("CHECK"));
142 }
143
144 #[tokio::test]
146 async fn test_filter_clone_is_independent() {
147 let svc = FilterService::new(|_: &Exchange| true, passthrough());
148 let mut clone = svc.clone();
149 let ex = Exchange::new(Message::new("hi"));
150 let result = clone.ready().await.unwrap().call(ex).await.unwrap();
151 assert_eq!(result.input.body.as_text(), Some("hi"));
152 }
153}