camel_component_timer/
lib.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::time;
5use tracing::debug;
6
7use camel_api::{BoxProcessor, CamelError, Exchange, Message};
8use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
9use camel_endpoint::UriConfig;
10
11#[derive(Debug, Clone, UriConfig)]
19#[uri_scheme = "timer"]
20pub struct TimerConfig {
21 pub name: String,
23
24 #[allow(dead_code)] #[uri_param(name = "period", default = "1000")]
27 period_ms: u64,
28
29 pub period: Duration,
31
32 #[allow(dead_code)] #[uri_param(name = "delay", default = "0")]
35 delay_ms: u64,
36
37 pub delay: Duration,
39
40 #[uri_param(name = "repeatCount")]
42 pub repeat_count: Option<u32>,
43}
44
45pub struct TimerComponent;
51
52impl TimerComponent {
53 pub fn new() -> Self {
54 Self
55 }
56}
57
58impl Default for TimerComponent {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64impl Component for TimerComponent {
65 fn scheme(&self) -> &str {
66 "timer"
67 }
68
69 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
70 let config = TimerConfig::from_uri(uri)?;
71 Ok(Box::new(TimerEndpoint {
72 uri: uri.to_string(),
73 config,
74 }))
75 }
76}
77
78struct TimerEndpoint {
83 uri: String,
84 config: TimerConfig,
85}
86
87impl Endpoint for TimerEndpoint {
88 fn uri(&self) -> &str {
89 &self.uri
90 }
91
92 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
93 Ok(Box::new(TimerConsumer {
94 config: self.config.clone(),
95 }))
96 }
97
98 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
99 Err(CamelError::EndpointCreationFailed(
100 "timer endpoint does not support producers".to_string(),
101 ))
102 }
103}
104
105struct TimerConsumer {
110 config: TimerConfig,
111}
112
113#[async_trait]
114impl Consumer for TimerConsumer {
115 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
116 let config = self.config.clone();
117
118 if !config.delay.is_zero() {
120 tokio::select! {
121 _ = time::sleep(config.delay) => {}
122 _ = context.cancelled() => {
123 debug!(timer = config.name, "Timer cancelled during initial delay");
124 return Ok(());
125 }
126 }
127 }
128
129 let mut interval = time::interval(config.period);
130 let mut count: u32 = 0;
131
132 loop {
133 tokio::select! {
134 _ = context.cancelled() => {
135 debug!(timer = config.name, "Timer received cancellation, stopping");
136 break;
137 }
138 _ = interval.tick() => {
139 count += 1;
140
141 debug!(timer = config.name, count, "Timer tick");
142
143 let mut exchange = Exchange::new(Message::new(format!(
144 "timer://{} tick #{}",
145 config.name, count
146 )));
147 exchange.input.set_header(
148 "CamelTimerName",
149 serde_json::Value::String(config.name.clone()),
150 );
151 exchange
152 .input
153 .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
154
155 if context.send(exchange).await.is_err() {
156 break;
158 }
159
160 if let Some(max) = config.repeat_count
161 && count >= max
162 {
163 break;
164 }
165 }
166 }
167 }
168
169 Ok(())
170 }
171
172 async fn stop(&mut self) -> Result<(), CamelError> {
173 Ok(())
174 }
175}
176
177#[cfg(test)]
182mod tests {
183 use super::*;
184
185 #[test]
186 fn test_timer_config_defaults() {
187 let config = TimerConfig::from_uri("timer:tick").unwrap();
188 assert_eq!(config.name, "tick");
189 assert_eq!(config.period, Duration::from_millis(1000));
190 assert_eq!(config.delay, Duration::from_millis(0));
191 assert_eq!(config.repeat_count, None);
192 }
193
194 #[test]
195 fn test_timer_config_with_params() {
196 let config =
197 TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
198 assert_eq!(config.name, "myTimer");
199 assert_eq!(config.period, Duration::from_millis(500));
200 assert_eq!(config.delay, Duration::from_millis(100));
201 assert_eq!(config.repeat_count, Some(5));
202 }
203
204 #[test]
205 fn test_timer_config_wrong_scheme() {
206 let result = TimerConfig::from_uri("log:info");
207 assert!(result.is_err());
208 }
209
210 #[test]
211 fn test_timer_component_scheme() {
212 let component = TimerComponent::new();
213 assert_eq!(component.scheme(), "timer");
214 }
215
216 #[test]
217 fn test_timer_component_creates_endpoint() {
218 let component = TimerComponent::new();
219 let endpoint = component.create_endpoint("timer:tick?period=1000");
220 assert!(endpoint.is_ok());
221 }
222
223 #[test]
224 fn test_timer_endpoint_no_producer() {
225 let ctx = ProducerContext::new();
226 let component = TimerComponent::new();
227 let endpoint = component.create_endpoint("timer:tick").unwrap();
228 let producer = endpoint.create_producer(&ctx);
229 assert!(producer.is_err());
230 }
231
232 #[tokio::test]
233 async fn test_timer_consumer_fires() {
234 let component = TimerComponent::new();
235 let endpoint = component
236 .create_endpoint("timer:test?period=50&repeatCount=3")
237 .unwrap();
238 let mut consumer = endpoint.create_consumer().unwrap();
239
240 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
241 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
242
243 tokio::spawn(async move {
245 consumer.start(ctx).await.unwrap();
246 });
247
248 let mut received = Vec::new();
250 while let Some(envelope) = rx.recv().await {
251 received.push(envelope.exchange);
252 if received.len() == 3 {
253 break;
254 }
255 }
256
257 assert_eq!(received.len(), 3);
258
259 let first = &received[0];
261 assert_eq!(
262 first.input.header("CamelTimerName"),
263 Some(&serde_json::Value::String("test".into()))
264 );
265 assert_eq!(
266 first.input.header("CamelTimerCounter"),
267 Some(&serde_json::Value::Number(1.into()))
268 );
269 }
270
271 #[tokio::test]
272 async fn test_timer_consumer_respects_cancellation() {
273 use tokio_util::sync::CancellationToken;
274
275 let token = CancellationToken::new();
276 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
277 let ctx = ConsumerContext::new(tx, token.clone());
278
279 let mut consumer = TimerConsumer {
280 config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
281 };
282
283 let handle = tokio::spawn(async move {
284 consumer.start(ctx).await.unwrap();
285 });
286
287 tokio::time::sleep(Duration::from_millis(180)).await;
289 token.cancel();
290
291 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
292 assert!(
293 result.is_ok(),
294 "Consumer should have stopped after cancellation"
295 );
296
297 let mut count = 0;
298 while rx.try_recv().is_ok() {
299 count += 1;
300 }
301 assert!(
302 count >= 2,
303 "Expected at least 2 exchanges before cancellation, got {count}"
304 );
305 }
306}