camel_component_timer/
lib.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::time;
5use tracing::debug;
6
7use camel_component_api::UriConfig;
8use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
9use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
10
11#[derive(Debug, Clone, UriConfig)]
19#[uri_scheme = "timer"]
20#[uri_config(crate = "camel_component_api")]
21pub struct TimerConfig {
22 pub name: String,
24
25 #[allow(dead_code)] #[uri_param(name = "period", default = "1000")]
28 period_ms: u64,
29
30 pub period: Duration,
32
33 #[allow(dead_code)] #[uri_param(name = "delay", default = "0")]
36 delay_ms: u64,
37
38 pub delay: Duration,
40
41 #[uri_param(name = "repeatCount")]
43 pub repeat_count: Option<u32>,
44}
45
46pub struct TimerComponent;
52
53impl TimerComponent {
54 pub fn new() -> Self {
55 Self
56 }
57}
58
59impl Default for TimerComponent {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl Component for TimerComponent {
66 fn scheme(&self) -> &str {
67 "timer"
68 }
69
70 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
71 let config = TimerConfig::from_uri(uri)?;
72 Ok(Box::new(TimerEndpoint {
73 uri: uri.to_string(),
74 config,
75 }))
76 }
77}
78
79struct TimerEndpoint {
84 uri: String,
85 config: TimerConfig,
86}
87
88impl Endpoint for TimerEndpoint {
89 fn uri(&self) -> &str {
90 &self.uri
91 }
92
93 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
94 Ok(Box::new(TimerConsumer {
95 config: self.config.clone(),
96 }))
97 }
98
99 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
100 Err(CamelError::EndpointCreationFailed(
101 "timer endpoint does not support producers".to_string(),
102 ))
103 }
104}
105
106struct TimerConsumer {
111 config: TimerConfig,
112}
113
114#[async_trait]
115impl Consumer for TimerConsumer {
116 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
117 let config = self.config.clone();
118
119 if !config.delay.is_zero() {
121 tokio::select! {
122 _ = time::sleep(config.delay) => {}
123 _ = context.cancelled() => {
124 debug!(timer = config.name, "Timer cancelled during initial delay");
125 return Ok(());
126 }
127 }
128 }
129
130 let mut interval = time::interval(config.period);
131 let mut count: u32 = 0;
132
133 loop {
134 tokio::select! {
135 _ = context.cancelled() => {
136 debug!(timer = config.name, "Timer received cancellation, stopping");
137 break;
138 }
139 _ = interval.tick() => {
140 count += 1;
141
142 debug!(timer = config.name, count, "Timer tick");
143
144 let mut exchange = Exchange::new(Message::new(format!(
145 "timer://{} tick #{}",
146 config.name, count
147 )));
148 exchange.input.set_header(
149 "CamelTimerName",
150 serde_json::Value::String(config.name.clone()),
151 );
152 exchange
153 .input
154 .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
155
156 if context.send(exchange).await.is_err() {
157 break;
159 }
160
161 if let Some(max) = config.repeat_count
162 && count >= max
163 {
164 break;
165 }
166 }
167 }
168 }
169
170 Ok(())
171 }
172
173 async fn stop(&mut self) -> Result<(), CamelError> {
174 Ok(())
175 }
176}
177
178#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn test_timer_config_defaults() {
188 let config = TimerConfig::from_uri("timer:tick").unwrap();
189 assert_eq!(config.name, "tick");
190 assert_eq!(config.period, Duration::from_millis(1000));
191 assert_eq!(config.delay, Duration::from_millis(0));
192 assert_eq!(config.repeat_count, None);
193 }
194
195 #[test]
196 fn test_timer_config_with_params() {
197 let config =
198 TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
199 assert_eq!(config.name, "myTimer");
200 assert_eq!(config.period, Duration::from_millis(500));
201 assert_eq!(config.delay, Duration::from_millis(100));
202 assert_eq!(config.repeat_count, Some(5));
203 }
204
205 #[test]
206 fn test_timer_config_wrong_scheme() {
207 let result = TimerConfig::from_uri("log:info");
208 assert!(result.is_err());
209 }
210
211 #[test]
212 fn test_timer_component_scheme() {
213 let component = TimerComponent::new();
214 assert_eq!(component.scheme(), "timer");
215 }
216
217 #[test]
218 fn test_timer_component_creates_endpoint() {
219 let component = TimerComponent::new();
220 let endpoint = component.create_endpoint("timer:tick?period=1000");
221 assert!(endpoint.is_ok());
222 }
223
224 #[test]
225 fn test_timer_endpoint_no_producer() {
226 let ctx = ProducerContext::new();
227 let component = TimerComponent::new();
228 let endpoint = component.create_endpoint("timer:tick").unwrap();
229 let producer = endpoint.create_producer(&ctx);
230 assert!(producer.is_err());
231 }
232
233 #[tokio::test]
234 async fn test_timer_consumer_fires() {
235 let component = TimerComponent::new();
236 let endpoint = component
237 .create_endpoint("timer:test?period=50&repeatCount=3")
238 .unwrap();
239 let mut consumer = endpoint.create_consumer().unwrap();
240
241 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
242 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
243
244 tokio::spawn(async move {
246 consumer.start(ctx).await.unwrap();
247 });
248
249 let mut received = Vec::new();
251 while let Some(envelope) = rx.recv().await {
252 received.push(envelope.exchange);
253 if received.len() == 3 {
254 break;
255 }
256 }
257
258 assert_eq!(received.len(), 3);
259
260 let first = &received[0];
262 assert_eq!(
263 first.input.header("CamelTimerName"),
264 Some(&serde_json::Value::String("test".into()))
265 );
266 assert_eq!(
267 first.input.header("CamelTimerCounter"),
268 Some(&serde_json::Value::Number(1.into()))
269 );
270 }
271
272 #[tokio::test]
273 async fn test_timer_consumer_respects_cancellation() {
274 use tokio_util::sync::CancellationToken;
275
276 let token = CancellationToken::new();
277 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
278 let ctx = ConsumerContext::new(tx, token.clone());
279
280 let mut consumer = TimerConsumer {
281 config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
282 };
283
284 let handle = tokio::spawn(async move {
285 consumer.start(ctx).await.unwrap();
286 });
287
288 tokio::time::sleep(Duration::from_millis(180)).await;
290 token.cancel();
291
292 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
293 assert!(
294 result.is_ok(),
295 "Consumer should have stopped after cancellation"
296 );
297
298 let mut count = 0;
299 while rx.try_recv().is_ok() {
300 count += 1;
301 }
302 assert!(
303 count >= 2,
304 "Expected at least 2 exchanges before cancellation, got {count}"
305 );
306 }
307}