camel_processor/
delayer.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use tower::Service;
7
8use camel_api::{CamelError, DelayConfig, Exchange, Value};
9
10#[derive(Clone)]
11pub struct DelayerService {
12 config: DelayConfig,
13}
14
15impl DelayerService {
16 pub fn new(config: DelayConfig) -> Self {
17 Self { config }
18 }
19
20 fn effective_delay_ms(&self, exchange: &Exchange) -> u64 {
21 if let Some(ref header) = self.config.dynamic_header
22 && let Some(Value::Number(n)) = exchange.input.header(header)
23 && let Some(n) = n.as_f64()
24 && n >= 0.0
25 {
26 return n as u64;
27 }
28 self.config.delay_ms
29 }
30}
31
32impl Service<Exchange> for DelayerService {
33 type Response = Exchange;
34 type Error = CamelError;
35 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
36
37 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38 Poll::Ready(Ok(()))
39 }
40
41 fn call(&mut self, exchange: Exchange) -> Self::Future {
42 let delay_ms = self.effective_delay_ms(&exchange);
43 Box::pin(async move {
44 if delay_ms > 0 {
45 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
46 }
47 Ok(exchange)
48 })
49 }
50}
51
52#[cfg(test)]
53mod tests {
54 use std::time::{Duration, Instant};
55
56 use camel_api::DelayConfig;
57 use camel_api::{Exchange, Message, Value};
58 use tower::{Service, ServiceExt};
59
60 use super::DelayerService;
61
62 #[tokio::test]
63 async fn test_delayer_fixed_delay() {
64 let config = DelayConfig::new(100);
65 let mut svc = DelayerService::new(config);
66
67 let start = Instant::now();
68 let ex = Exchange::new(Message::new("test"));
69 let result = svc.ready().await.unwrap().call(ex).await;
70 let elapsed = start.elapsed();
71
72 assert!(result.is_ok());
73 assert!(elapsed >= Duration::from_millis(90));
74 }
75
76 #[tokio::test]
77 async fn test_delayer_dynamic_from_header() {
78 let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
79 let mut svc = DelayerService::new(config);
80
81 let mut ex = Exchange::new(Message::new("test"));
82 ex.input
83 .set_header("CamelDelayMs", Value::Number(50.into()));
84
85 let start = Instant::now();
86 let result = svc.ready().await.unwrap().call(ex).await;
87 let elapsed = start.elapsed();
88
89 assert!(result.is_ok());
90 assert!(elapsed >= Duration::from_millis(40));
91 assert!(elapsed < Duration::from_millis(200));
92 }
93
94 #[tokio::test]
95 async fn test_delayer_zero_delay() {
96 let config = DelayConfig::new(0);
97 let mut svc = DelayerService::new(config);
98
99 let start = Instant::now();
100 let ex = Exchange::new(Message::new("test"));
101 let result = svc.ready().await.unwrap().call(ex).await;
102 let elapsed = start.elapsed();
103
104 assert!(result.is_ok());
105 assert!(elapsed < Duration::from_millis(50));
106 }
107
108 #[tokio::test]
109 async fn test_delayer_zero_from_header() {
110 let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
111 let mut svc = DelayerService::new(config);
112
113 let mut ex = Exchange::new(Message::new("test"));
114 ex.input.set_header("CamelDelayMs", Value::Number(0.into()));
115
116 let start = Instant::now();
117 let result = svc.ready().await.unwrap().call(ex).await;
118 let elapsed = start.elapsed();
119
120 assert!(result.is_ok());
121 assert!(elapsed < Duration::from_millis(50));
122 }
123
124 #[tokio::test]
125 async fn test_delayer_fallback_on_invalid_header() {
126 let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
127 let mut svc = DelayerService::new(config);
128
129 let mut ex = Exchange::new(Message::new("test"));
130 ex.input
131 .set_header("CamelDelayMs", Value::String("not a number".into()));
132
133 let start = Instant::now();
134 let result = svc.ready().await.unwrap().call(ex).await;
135 let elapsed = start.elapsed();
136
137 assert!(result.is_ok());
138 assert!(elapsed >= Duration::from_millis(70));
139 }
140
141 #[tokio::test]
142 async fn test_delayer_negative_header() {
143 let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
144 let mut svc = DelayerService::new(config);
145
146 let mut ex = Exchange::new(Message::new("test"));
147 ex.input
148 .set_header("CamelDelayMs", Value::Number((-100).into()));
149
150 let start = Instant::now();
151 let result = svc.ready().await.unwrap().call(ex).await;
152 let elapsed = start.elapsed();
153
154 assert!(result.is_ok());
155 assert!(elapsed >= Duration::from_millis(70));
156 }
157
158 #[tokio::test]
159 async fn test_delayer_missing_header() {
160 let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
161 let mut svc = DelayerService::new(config);
162
163 let start = Instant::now();
164 let ex = Exchange::new(Message::new("test"));
165 let result = svc.ready().await.unwrap().call(ex).await;
166 let elapsed = start.elapsed();
167
168 assert!(result.is_ok());
169 assert!(elapsed >= Duration::from_millis(70));
170 }
171}