Skip to main content

camel_processor/
delayer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use tower::Service;
7
8use camel_api::{CamelError, DelayConfig, Exchange, Value};
9
10#[derive(Clone)]
11pub struct DelayerService {
12    config: DelayConfig,
13}
14
15impl DelayerService {
16    pub fn new(config: DelayConfig) -> Self {
17        Self { config }
18    }
19
20    fn effective_delay_ms(&self, exchange: &Exchange) -> u64 {
21        if let Some(ref header) = self.config.dynamic_header
22            && let Some(Value::Number(n)) = exchange.input.header(header)
23            && let Some(n) = n.as_f64()
24            && n >= 0.0
25        {
26            return n as u64;
27        }
28        self.config.delay_ms
29    }
30}
31
32impl Service<Exchange> for DelayerService {
33    type Response = Exchange;
34    type Error = CamelError;
35    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
36
37    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38        Poll::Ready(Ok(()))
39    }
40
41    fn call(&mut self, exchange: Exchange) -> Self::Future {
42        let delay_ms = self.effective_delay_ms(&exchange);
43        Box::pin(async move {
44            if delay_ms > 0 {
45                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
46            }
47            Ok(exchange)
48        })
49    }
50}
51
52#[cfg(test)]
53mod tests {
54    use std::time::{Duration, Instant};
55
56    use camel_api::DelayConfig;
57    use camel_api::{Exchange, Message, Value};
58    use tower::{Service, ServiceExt};
59
60    use super::DelayerService;
61
62    #[tokio::test]
63    async fn test_delayer_fixed_delay() {
64        let config = DelayConfig::new(100);
65        let mut svc = DelayerService::new(config);
66
67        let start = Instant::now();
68        let ex = Exchange::new(Message::new("test"));
69        let result = svc.ready().await.unwrap().call(ex).await;
70        let elapsed = start.elapsed();
71
72        assert!(result.is_ok());
73        assert!(elapsed >= Duration::from_millis(90));
74    }
75
76    #[tokio::test]
77    async fn test_delayer_dynamic_from_header() {
78        let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
79        let mut svc = DelayerService::new(config);
80
81        let mut ex = Exchange::new(Message::new("test"));
82        ex.input.set_header("CamelDelayMs", Value::Number(50.into()));
83
84        let start = Instant::now();
85        let result = svc.ready().await.unwrap().call(ex).await;
86        let elapsed = start.elapsed();
87
88        assert!(result.is_ok());
89        assert!(elapsed >= Duration::from_millis(40));
90        assert!(elapsed < Duration::from_millis(200));
91    }
92
93    #[tokio::test]
94    async fn test_delayer_zero_delay() {
95        let config = DelayConfig::new(0);
96        let mut svc = DelayerService::new(config);
97
98        let start = Instant::now();
99        let ex = Exchange::new(Message::new("test"));
100        let result = svc.ready().await.unwrap().call(ex).await;
101        let elapsed = start.elapsed();
102
103        assert!(result.is_ok());
104        assert!(elapsed < Duration::from_millis(50));
105    }
106
107    #[tokio::test]
108    async fn test_delayer_zero_from_header() {
109        let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
110        let mut svc = DelayerService::new(config);
111
112        let mut ex = Exchange::new(Message::new("test"));
113        ex.input.set_header("CamelDelayMs", Value::Number(0.into()));
114
115        let start = Instant::now();
116        let result = svc.ready().await.unwrap().call(ex).await;
117        let elapsed = start.elapsed();
118
119        assert!(result.is_ok());
120        assert!(elapsed < Duration::from_millis(50));
121    }
122
123    #[tokio::test]
124    async fn test_delayer_fallback_on_invalid_header() {
125        let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
126        let mut svc = DelayerService::new(config);
127
128        let mut ex = Exchange::new(Message::new("test"));
129        ex.input.set_header("CamelDelayMs", Value::String("not a number".into()));
130
131        let start = Instant::now();
132        let result = svc.ready().await.unwrap().call(ex).await;
133        let elapsed = start.elapsed();
134
135        assert!(result.is_ok());
136        assert!(elapsed >= Duration::from_millis(70));
137    }
138
139    #[tokio::test]
140    async fn test_delayer_negative_header() {
141        let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
142        let mut svc = DelayerService::new(config);
143
144        let mut ex = Exchange::new(Message::new("test"));
145        ex.input.set_header("CamelDelayMs", Value::Number((-100).into()));
146
147        let start = Instant::now();
148        let result = svc.ready().await.unwrap().call(ex).await;
149        let elapsed = start.elapsed();
150
151        assert!(result.is_ok());
152        assert!(elapsed >= Duration::from_millis(70));
153    }
154
155    #[tokio::test]
156    async fn test_delayer_missing_header() {
157        let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
158        let mut svc = DelayerService::new(config);
159
160        let start = Instant::now();
161        let ex = Exchange::new(Message::new("test"));
162        let result = svc.ready().await.unwrap().call(ex).await;
163        let elapsed = start.elapsed();
164
165        assert!(result.is_ok());
166        assert!(elapsed >= Duration::from_millis(70));
167    }
168}