camel_component_timer/
lib.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::time;
5use tracing::debug;
6
7use camel_component_api::UriConfig;
8use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
9use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
10
11#[derive(Debug, Clone, UriConfig)]
19#[uri_scheme = "timer"]
20#[uri_config(crate = "camel_component_api")]
21pub struct TimerConfig {
22 pub name: String,
24
25 #[allow(dead_code)] #[uri_param(name = "period", default = "1000")]
28 period_ms: u64,
29
30 pub period: Duration,
32
33 #[allow(dead_code)] #[uri_param(name = "delay", default = "0")]
36 delay_ms: u64,
37
38 pub delay: Duration,
40
41 #[uri_param(name = "repeatCount")]
43 pub repeat_count: Option<u32>,
44}
45
46pub struct TimerComponent;
52
53impl TimerComponent {
54 pub fn new() -> Self {
55 Self
56 }
57}
58
59impl Default for TimerComponent {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl Component for TimerComponent {
66 fn scheme(&self) -> &str {
67 "timer"
68 }
69
70 fn create_endpoint(
71 &self,
72 uri: &str,
73 _ctx: &dyn camel_component_api::ComponentContext,
74 ) -> Result<Box<dyn Endpoint>, CamelError> {
75 let config = TimerConfig::from_uri(uri)?;
76 Ok(Box::new(TimerEndpoint {
77 uri: uri.to_string(),
78 config,
79 }))
80 }
81}
82
83struct TimerEndpoint {
88 uri: String,
89 config: TimerConfig,
90}
91
92impl Endpoint for TimerEndpoint {
93 fn uri(&self) -> &str {
94 &self.uri
95 }
96
97 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
98 Ok(Box::new(TimerConsumer {
99 config: self.config.clone(),
100 }))
101 }
102
103 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
104 Err(CamelError::EndpointCreationFailed(
105 "timer endpoint does not support producers".to_string(),
106 ))
107 }
108}
109
110struct TimerConsumer {
115 config: TimerConfig,
116}
117
118#[async_trait]
119impl Consumer for TimerConsumer {
120 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
121 let config = self.config.clone();
122
123 if !config.delay.is_zero() {
125 tokio::select! {
126 _ = time::sleep(config.delay) => {}
127 _ = context.cancelled() => {
128 debug!(timer = config.name, "Timer cancelled during initial delay");
129 return Ok(());
130 }
131 }
132 }
133
134 let mut interval = time::interval(config.period);
135 let mut count: u32 = 0;
136
137 loop {
138 tokio::select! {
139 _ = context.cancelled() => {
140 debug!(timer = config.name, "Timer received cancellation, stopping");
141 break;
142 }
143 _ = interval.tick() => {
144 count += 1;
145
146 debug!(timer = config.name, count, "Timer tick");
147
148 let mut exchange = Exchange::new(Message::new(format!(
149 "timer://{} tick #{}",
150 config.name, count
151 )));
152 exchange.input.set_header(
153 "CamelTimerName",
154 serde_json::Value::String(config.name.clone()),
155 );
156 exchange
157 .input
158 .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
159
160 if context.send(exchange).await.is_err() {
161 break;
163 }
164
165 if let Some(max) = config.repeat_count
166 && count >= max
167 {
168 break;
169 }
170 }
171 }
172 }
173
174 Ok(())
175 }
176
177 async fn stop(&mut self) -> Result<(), CamelError> {
178 Ok(())
179 }
180}
181
182#[cfg(test)]
187mod tests {
188 use super::*;
189 use camel_component_api::NoOpComponentContext;
190
191 #[test]
192 fn test_timer_config_defaults() {
193 let config = TimerConfig::from_uri("timer:tick").unwrap();
194 assert_eq!(config.name, "tick");
195 assert_eq!(config.period, Duration::from_millis(1000));
196 assert_eq!(config.delay, Duration::from_millis(0));
197 assert_eq!(config.repeat_count, None);
198 }
199
200 #[test]
201 fn test_timer_config_with_params() {
202 let config =
203 TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
204 assert_eq!(config.name, "myTimer");
205 assert_eq!(config.period, Duration::from_millis(500));
206 assert_eq!(config.delay, Duration::from_millis(100));
207 assert_eq!(config.repeat_count, Some(5));
208 }
209
210 #[test]
211 fn test_timer_config_wrong_scheme() {
212 let result = TimerConfig::from_uri("log:info");
213 assert!(result.is_err());
214 }
215
216 #[test]
217 fn test_timer_component_scheme() {
218 let component = TimerComponent::new();
219 assert_eq!(component.scheme(), "timer");
220 }
221
222 #[test]
223 fn test_timer_component_creates_endpoint() {
224 let component = TimerComponent::new();
225 let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
226 assert!(endpoint.is_ok());
227 }
228
229 #[test]
230 fn test_timer_endpoint_no_producer() {
231 let ctx = ProducerContext::new();
232 let component = TimerComponent::new();
233 let endpoint = component
234 .create_endpoint("timer:tick", &NoOpComponentContext)
235 .unwrap();
236 let producer = endpoint.create_producer(&ctx);
237 assert!(producer.is_err());
238 }
239
240 #[tokio::test]
241 async fn test_timer_consumer_fires() {
242 let component = TimerComponent::new();
243 let endpoint = component
244 .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
245 .unwrap();
246 let mut consumer = endpoint.create_consumer().unwrap();
247
248 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
249 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
250
251 tokio::spawn(async move {
253 consumer.start(ctx).await.unwrap();
254 });
255
256 let mut received = Vec::new();
258 while let Some(envelope) = rx.recv().await {
259 received.push(envelope.exchange);
260 if received.len() == 3 {
261 break;
262 }
263 }
264
265 assert_eq!(received.len(), 3);
266
267 let first = &received[0];
269 assert_eq!(
270 first.input.header("CamelTimerName"),
271 Some(&serde_json::Value::String("test".into()))
272 );
273 assert_eq!(
274 first.input.header("CamelTimerCounter"),
275 Some(&serde_json::Value::Number(1.into()))
276 );
277 }
278
279 #[tokio::test]
280 async fn test_timer_consumer_respects_cancellation() {
281 use tokio_util::sync::CancellationToken;
282
283 let token = CancellationToken::new();
284 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
285 let ctx = ConsumerContext::new(tx, token.clone());
286
287 let mut consumer = TimerConsumer {
288 config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
289 };
290
291 let handle = tokio::spawn(async move {
292 consumer.start(ctx).await.unwrap();
293 });
294
295 tokio::time::sleep(Duration::from_millis(180)).await;
297 token.cancel();
298
299 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
300 assert!(
301 result.is_ok(),
302 "Consumer should have stopped after cancellation"
303 );
304
305 let mut count = 0;
306 while rx.try_recv().is_ok() {
307 count += 1;
308 }
309 assert!(
310 count >= 2,
311 "Expected at least 2 exchanges before cancellation, got {count}"
312 );
313 }
314}