Skip to main content

camel_processor/
stop.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{CamelError, Exchange};
8
9/// Tower Service implementing the Stop EIP.
10///
11/// Always returns `Err(CamelError::Stopped)`, signalling the pipeline to halt
12/// processing for this exchange. The pipeline captures `Stopped` and returns
13/// the exchange as-is rather than propagating the error upstream.
14#[derive(Clone)]
15pub struct StopService;
16
17impl Service<Exchange> for StopService {
18    type Response = Exchange;
19    type Error = CamelError;
20    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
21
22    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
23        Poll::Ready(Ok(()))
24    }
25
26    fn call(&mut self, _exchange: Exchange) -> Self::Future {
27        Box::pin(async { Err(CamelError::Stopped) })
28    }
29}
30
31#[cfg(test)]
32mod tests {
33    use camel_api::{CamelError, Exchange, Message};
34    use tower::{Service, ServiceExt};
35
36    use super::StopService;
37
38    // StopService always returns Err(CamelError::Stopped), regardless of exchange content.
39    #[tokio::test]
40    async fn test_stop_service_always_returns_stopped() {
41        let mut svc = StopService;
42        let ex = Exchange::new(Message::new("anything"));
43        let result = svc.ready().await.unwrap().call(ex).await;
44        assert!(matches!(result, Err(CamelError::Stopped)));
45    }
46
47    // StopService is Clone.
48    #[tokio::test]
49    async fn test_stop_service_is_clone() {
50        let svc = StopService;
51        let mut clone = svc.clone();
52        let ex = Exchange::new(Message::new("x"));
53        let result = clone.ready().await.unwrap().call(ex).await;
54        assert!(matches!(result, Err(CamelError::Stopped)));
55    }
56}