camel_processor/
validate.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use camel_api::{CamelError, Exchange, FilterPredicate};
7use tower::Service;
8
9#[derive(Clone)]
14pub struct ValidateService {
15 predicate: FilterPredicate,
16 expression_source: String,
17}
18
19impl ValidateService {
20 pub fn new(
22 predicate: impl Fn(&Exchange) -> bool + Send + Sync + 'static,
23 expression_source: impl Into<String>,
24 ) -> Self {
25 Self {
26 predicate: Arc::new(predicate),
27 expression_source: expression_source.into(),
28 }
29 }
30
31 pub fn from_predicate(
33 predicate: FilterPredicate,
34 expression_source: impl Into<String>,
35 ) -> Self {
36 Self {
37 predicate,
38 expression_source: expression_source.into(),
39 }
40 }
41}
42
43impl Service<Exchange> for ValidateService {
44 type Response = Exchange;
45 type Error = CamelError;
46 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
47
48 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49 Poll::Ready(Ok(()))
50 }
51
52 fn call(&mut self, exchange: Exchange) -> Self::Future {
53 if (self.predicate)(&exchange) {
54 Box::pin(async move { Ok(exchange) })
55 } else {
56 let source = self.expression_source.clone();
57 Box::pin(async move {
58 Err(CamelError::ValidationError(format!(
59 "validate('{source}'): predicate returned false",
60 )))
61 })
62 }
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69 use camel_api::Message;
70
71 #[tokio::test]
75 async fn test_validate_passing_predicate_returns_ok() {
76 let mut svc = ValidateService::new(|_ex: &Exchange| true, "true predicate");
77 let ex = Exchange::new(Message::new("hello"));
78 let result = svc.call(ex).await;
79 assert!(result.is_ok());
80 assert_eq!(result.unwrap().input.body.as_text(), Some("hello"));
81 }
82
83 #[tokio::test]
85 async fn test_validate_failing_predicate_returns_err() {
86 let mut svc = ValidateService::new(|_ex: &Exchange| false, "false predicate");
87 let ex = Exchange::new(Message::new("hello"));
88 let result = svc.call(ex).await;
89 assert!(result.is_err());
90 match result.unwrap_err() {
91 CamelError::ValidationError(msg) => {
92 assert!(
93 msg.contains("false predicate"),
94 "error message should contain expression source, got: {msg}"
95 );
96 }
97 other => panic!("expected ValidationError, got: {other:?}"),
98 }
99 }
100
101 #[tokio::test]
103 async fn test_validate_predicate_evaluates_body() {
104 let mut svc = ValidateService::new(
105 |ex: &Exchange| ex.input.body.as_text().map_or(false, |s| s.len() > 3),
106 "body length > 3",
107 );
108 let short = Exchange::new(Message::new("ab"));
109 let long = Exchange::new(Message::new("abcdef"));
110
111 assert!(svc.call(short).await.is_err());
112 assert!(svc.call(long).await.is_ok());
113 }
114
115 #[tokio::test]
117 async fn test_validate_clone_is_independent() {
118 let svc = ValidateService::new(|_ex: &Exchange| true, "true predicate");
119 let mut cloned = svc.clone();
120 let ex = Exchange::new(Message::new("hi"));
121 let result = cloned.call(ex).await;
122 assert!(result.is_ok());
123 }
124
125 #[tokio::test]
127 async fn test_validate_poll_ready() {
128 let mut svc = ValidateService::new(|_ex: &Exchange| true, "true predicate");
129 let poll = svc.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
130 assert!(poll.is_ready());
131 match poll {
133 std::task::Poll::Ready(Ok(())) => {}
134 other => panic!("expected Ready(Ok(())), got: {other:?}"),
135 }
136 }
137}