Skip to main content

camel_processor/
validate.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use camel_api::{CamelError, Exchange, FilterPredicate};
7use tower::Service;
8
9/// Tower Service implementing the Validate EIP.
10///
11/// If the predicate returns `true`, the exchange continues (returned as `Ok`).
12/// If `false`, a `CamelError::ValidationError` is returned.
13#[derive(Clone)]
14pub struct ValidateService {
15    predicate: FilterPredicate,
16    expression_source: String,
17}
18
19impl ValidateService {
20    /// Create from a closure predicate and an expression source string (for error messages).
21    pub fn new(
22        predicate: impl Fn(&Exchange) -> bool + Send + Sync + 'static,
23        expression_source: impl Into<String>,
24    ) -> Self {
25        Self {
26            predicate: Arc::new(predicate),
27            expression_source: expression_source.into(),
28        }
29    }
30
31    /// Create from a pre-boxed `FilterPredicate` (used by `resolve_steps`).
32    pub fn from_predicate(
33        predicate: FilterPredicate,
34        expression_source: impl Into<String>,
35    ) -> Self {
36        Self {
37            predicate,
38            expression_source: expression_source.into(),
39        }
40    }
41}
42
43impl Service<Exchange> for ValidateService {
44    type Response = Exchange;
45    type Error = CamelError;
46    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
47
48    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49        Poll::Ready(Ok(()))
50    }
51
52    fn call(&mut self, exchange: Exchange) -> Self::Future {
53        if (self.predicate)(&exchange) {
54            Box::pin(async move { Ok(exchange) })
55        } else {
56            let source = self.expression_source.clone();
57            Box::pin(async move {
58                Err(CamelError::ValidationError(format!(
59                    "validate('{source}'): predicate returned false",
60                )))
61            })
62        }
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69    use camel_api::Message;
70
71    // ── ValidateService tests ──
72
73    // 1. Passing predicate returns Ok(exchange)
74    #[tokio::test]
75    async fn test_validate_passing_predicate_returns_ok() {
76        let mut svc = ValidateService::new(|_ex: &Exchange| true, "true predicate");
77        let ex = Exchange::new(Message::new("hello"));
78        let result = svc.call(ex).await;
79        assert!(result.is_ok());
80        assert_eq!(result.unwrap().input.body.as_text(), Some("hello"));
81    }
82
83    // 2. Failing predicate returns Err(ValidationError)
84    #[tokio::test]
85    async fn test_validate_failing_predicate_returns_err() {
86        let mut svc = ValidateService::new(|_ex: &Exchange| false, "false predicate");
87        let ex = Exchange::new(Message::new("hello"));
88        let result = svc.call(ex).await;
89        assert!(result.is_err());
90        match result.unwrap_err() {
91            CamelError::ValidationError(msg) => {
92                assert!(
93                    msg.contains("false predicate"),
94                    "error message should contain expression source, got: {msg}"
95                );
96            }
97            other => panic!("expected ValidationError, got: {other:?}"),
98        }
99    }
100
101    // 3. Predicate evaluates the exchange (body-based validation)
102    #[tokio::test]
103    async fn test_validate_predicate_evaluates_body() {
104        let mut svc = ValidateService::new(
105            |ex: &Exchange| ex.input.body.as_text().map_or(false, |s| s.len() > 3),
106            "body length > 3",
107        );
108        let short = Exchange::new(Message::new("ab"));
109        let long = Exchange::new(Message::new("abcdef"));
110
111        assert!(svc.call(short).await.is_err());
112        assert!(svc.call(long).await.is_ok());
113    }
114
115    // 4. ValidateService is Clone
116    #[tokio::test]
117    async fn test_validate_clone_is_independent() {
118        let svc = ValidateService::new(|_ex: &Exchange| true, "true predicate");
119        let mut cloned = svc.clone();
120        let ex = Exchange::new(Message::new("hi"));
121        let result = cloned.call(ex).await;
122        assert!(result.is_ok());
123    }
124
125    // 5. poll_ready is always Ready(Ok(()))
126    #[tokio::test]
127    async fn test_validate_poll_ready() {
128        let mut svc = ValidateService::new(|_ex: &Exchange| true, "true predicate");
129        let poll = svc.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
130        assert!(poll.is_ready());
131        // unwrap the Poll<Result<...>>
132        match poll {
133            std::task::Poll::Ready(Ok(())) => {}
134            other => panic!("expected Ready(Ok(())), got: {other:?}"),
135        }
136    }
137}