Skip to main content

camel_processor/
delayer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use tower::Service;
7
8use camel_api::{CamelError, DelayConfig, Exchange, Value};
9
10#[derive(Clone)]
11pub struct DelayerService {
12    config: DelayConfig,
13}
14
15impl DelayerService {
16    pub fn new(config: DelayConfig) -> Self {
17        Self { config }
18    }
19
20    fn effective_delay_ms(&self, exchange: &Exchange) -> u64 {
21        if let Some(ref header) = self.config.dynamic_header
22            && let Some(Value::Number(n)) = exchange.input.header(header)
23            && let Some(n) = n.as_f64()
24            && n >= 0.0
25        {
26            return n as u64;
27        }
28        self.config.delay_ms
29    }
30}
31
32impl Service<Exchange> for DelayerService {
33    type Response = Exchange;
34    type Error = CamelError;
35    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
36
37    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38        Poll::Ready(Ok(()))
39    }
40
41    fn call(&mut self, exchange: Exchange) -> Self::Future {
42        let delay_ms = self.effective_delay_ms(&exchange);
43        Box::pin(async move {
44            if delay_ms > 0 {
45                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
46            }
47            Ok(exchange)
48        })
49    }
50}
51
52#[cfg(test)]
53mod tests {
54    use std::time::{Duration, Instant};
55
56    use camel_api::DelayConfig;
57    use camel_api::{Exchange, Message, Value};
58    use tower::{Service, ServiceExt};
59
60    use super::DelayerService;
61
62    #[tokio::test]
63    async fn test_delayer_fixed_delay() {
64        let config = DelayConfig::new(100);
65        let mut svc = DelayerService::new(config);
66
67        let start = Instant::now();
68        let ex = Exchange::new(Message::new("test"));
69        let result = svc.ready().await.unwrap().call(ex).await;
70        let elapsed = start.elapsed();
71
72        assert!(result.is_ok());
73        assert!(elapsed >= Duration::from_millis(90));
74    }
75
76    #[tokio::test]
77    async fn test_delayer_dynamic_from_header() {
78        let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
79        let mut svc = DelayerService::new(config);
80
81        let mut ex = Exchange::new(Message::new("test"));
82        ex.input
83            .set_header("CamelDelayMs", Value::Number(50.into()));
84
85        let start = Instant::now();
86        let result = svc.ready().await.unwrap().call(ex).await;
87        let elapsed = start.elapsed();
88
89        assert!(result.is_ok());
90        assert!(elapsed >= Duration::from_millis(40));
91        assert!(elapsed < Duration::from_millis(200));
92    }
93
94    #[tokio::test]
95    async fn test_delayer_zero_delay() {
96        let config = DelayConfig::new(0);
97        let mut svc = DelayerService::new(config);
98
99        let start = Instant::now();
100        let ex = Exchange::new(Message::new("test"));
101        let result = svc.ready().await.unwrap().call(ex).await;
102        let elapsed = start.elapsed();
103
104        assert!(result.is_ok());
105        assert!(elapsed < Duration::from_millis(50));
106    }
107
108    #[tokio::test]
109    async fn test_delayer_zero_from_header() {
110        let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
111        let mut svc = DelayerService::new(config);
112
113        let mut ex = Exchange::new(Message::new("test"));
114        ex.input.set_header("CamelDelayMs", Value::Number(0.into()));
115
116        let start = Instant::now();
117        let result = svc.ready().await.unwrap().call(ex).await;
118        let elapsed = start.elapsed();
119
120        assert!(result.is_ok());
121        assert!(elapsed < Duration::from_millis(50));
122    }
123
124    #[tokio::test]
125    async fn test_delayer_fallback_on_invalid_header() {
126        let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
127        let mut svc = DelayerService::new(config);
128
129        let mut ex = Exchange::new(Message::new("test"));
130        ex.input
131            .set_header("CamelDelayMs", Value::String("not a number".into()));
132
133        let start = Instant::now();
134        let result = svc.ready().await.unwrap().call(ex).await;
135        let elapsed = start.elapsed();
136
137        assert!(result.is_ok());
138        assert!(elapsed >= Duration::from_millis(70));
139    }
140
141    #[tokio::test]
142    async fn test_delayer_negative_header() {
143        let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
144        let mut svc = DelayerService::new(config);
145
146        let mut ex = Exchange::new(Message::new("test"));
147        ex.input
148            .set_header("CamelDelayMs", Value::Number((-100).into()));
149
150        let start = Instant::now();
151        let result = svc.ready().await.unwrap().call(ex).await;
152        let elapsed = start.elapsed();
153
154        assert!(result.is_ok());
155        assert!(elapsed >= Duration::from_millis(70));
156    }
157
158    #[tokio::test]
159    async fn test_delayer_missing_header() {
160        let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
161        let mut svc = DelayerService::new(config);
162
163        let start = Instant::now();
164        let ex = Exchange::new(Message::new("test"));
165        let result = svc.ready().await.unwrap().call(ex).await;
166        let elapsed = start.elapsed();
167
168        assert!(result.is_ok());
169        assert!(elapsed >= Duration::from_millis(70));
170    }
171}