camel_processor/
delayer.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use tower::Service;
7
8use camel_api::{CamelError, DelayConfig, Exchange, Value};
9
10#[derive(Clone)]
11pub struct DelayerService {
12 config: DelayConfig,
13}
14
15impl DelayerService {
16 pub fn new(config: DelayConfig) -> Self {
17 Self { config }
18 }
19
20 fn effective_delay_ms(&self, exchange: &Exchange) -> u64 {
21 if let Some(ref header) = self.config.dynamic_header
22 && let Some(Value::Number(n)) = exchange.input.header(header)
23 && let Some(n) = n.as_f64()
24 && n >= 0.0
25 {
26 return n as u64;
27 }
28 self.config.delay_ms
29 }
30}
31
32impl Service<Exchange> for DelayerService {
33 type Response = Exchange;
34 type Error = CamelError;
35 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
36
37 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38 Poll::Ready(Ok(()))
39 }
40
41 fn call(&mut self, exchange: Exchange) -> Self::Future {
42 let delay_ms = self.effective_delay_ms(&exchange);
43 Box::pin(async move {
44 if delay_ms > 0 {
45 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
46 }
47 Ok(exchange)
48 })
49 }
50}
51
52#[cfg(test)]
53mod tests {
54 use std::time::{Duration, Instant};
55
56 use camel_api::DelayConfig;
57 use camel_api::{Exchange, Message, Value};
58 use tower::{Service, ServiceExt};
59
60 use super::DelayerService;
61
62 #[tokio::test]
63 async fn test_delayer_fixed_delay() {
64 let config = DelayConfig::new(100);
65 let mut svc = DelayerService::new(config);
66
67 let start = Instant::now();
68 let ex = Exchange::new(Message::new("test"));
69 let result = svc.ready().await.unwrap().call(ex).await;
70 let elapsed = start.elapsed();
71
72 assert!(result.is_ok());
73 assert!(elapsed >= Duration::from_millis(90));
74 }
75
76 #[tokio::test]
77 async fn test_delayer_dynamic_from_header() {
78 let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
79 let mut svc = DelayerService::new(config);
80
81 let mut ex = Exchange::new(Message::new("test"));
82 ex.input.set_header("CamelDelayMs", Value::Number(50.into()));
83
84 let start = Instant::now();
85 let result = svc.ready().await.unwrap().call(ex).await;
86 let elapsed = start.elapsed();
87
88 assert!(result.is_ok());
89 assert!(elapsed >= Duration::from_millis(40));
90 assert!(elapsed < Duration::from_millis(200));
91 }
92
93 #[tokio::test]
94 async fn test_delayer_zero_delay() {
95 let config = DelayConfig::new(0);
96 let mut svc = DelayerService::new(config);
97
98 let start = Instant::now();
99 let ex = Exchange::new(Message::new("test"));
100 let result = svc.ready().await.unwrap().call(ex).await;
101 let elapsed = start.elapsed();
102
103 assert!(result.is_ok());
104 assert!(elapsed < Duration::from_millis(50));
105 }
106
107 #[tokio::test]
108 async fn test_delayer_zero_from_header() {
109 let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
110 let mut svc = DelayerService::new(config);
111
112 let mut ex = Exchange::new(Message::new("test"));
113 ex.input.set_header("CamelDelayMs", Value::Number(0.into()));
114
115 let start = Instant::now();
116 let result = svc.ready().await.unwrap().call(ex).await;
117 let elapsed = start.elapsed();
118
119 assert!(result.is_ok());
120 assert!(elapsed < Duration::from_millis(50));
121 }
122
123 #[tokio::test]
124 async fn test_delayer_fallback_on_invalid_header() {
125 let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
126 let mut svc = DelayerService::new(config);
127
128 let mut ex = Exchange::new(Message::new("test"));
129 ex.input.set_header("CamelDelayMs", Value::String("not a number".into()));
130
131 let start = Instant::now();
132 let result = svc.ready().await.unwrap().call(ex).await;
133 let elapsed = start.elapsed();
134
135 assert!(result.is_ok());
136 assert!(elapsed >= Duration::from_millis(70));
137 }
138
139 #[tokio::test]
140 async fn test_delayer_negative_header() {
141 let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
142 let mut svc = DelayerService::new(config);
143
144 let mut ex = Exchange::new(Message::new("test"));
145 ex.input.set_header("CamelDelayMs", Value::Number((-100).into()));
146
147 let start = Instant::now();
148 let result = svc.ready().await.unwrap().call(ex).await;
149 let elapsed = start.elapsed();
150
151 assert!(result.is_ok());
152 assert!(elapsed >= Duration::from_millis(70));
153 }
154
155 #[tokio::test]
156 async fn test_delayer_missing_header() {
157 let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
158 let mut svc = DelayerService::new(config);
159
160 let start = Instant::now();
161 let ex = Exchange::new(Message::new("test"));
162 let result = svc.ready().await.unwrap().call(ex).await;
163 let elapsed = start.elapsed();
164
165 assert!(result.is_ok());
166 assert!(elapsed >= Duration::from_millis(70));
167 }
168}