camel_component_timer/
lib.rs1use std::time::Duration;
7
8use async_trait::async_trait;
9use tokio::time;
10use tracing::debug;
11
12use camel_component_api::UriConfig;
13use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
14use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
15
16#[derive(Debug, Clone, UriConfig)]
24#[uri_scheme = "timer"]
25#[uri_config(skip_impl, crate = "camel_component_api")]
26pub struct TimerConfig {
27 pub name: String,
29
30 #[allow(dead_code)] #[uri_param(name = "period", default = "1000")]
33 period_ms: u64,
34
35 pub period: Duration,
37
38 #[allow(dead_code)] #[uri_param(name = "delay", default = "0")]
41 delay_ms: u64,
42
43 pub delay: Duration,
45
46 #[uri_param(name = "repeatCount")]
48 pub repeat_count: Option<u32>,
49}
50
51impl UriConfig for TimerConfig {
52 fn scheme() -> &'static str {
53 "timer"
54 }
55
56 fn from_uri(uri: &str) -> Result<Self, CamelError> {
57 let parts = camel_component_api::parse_uri(uri)?;
58 Self::from_components(parts)
59 }
60
61 fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
62 let config = Self::parse_uri_components(parts)?;
63 config.validate()
64 }
65
66 fn validate(self) -> Result<Self, CamelError> {
67 if self.name.trim().is_empty() {
68 return Err(CamelError::InvalidUri(
69 "timer name must not be empty".to_string(),
70 ));
71 }
72 if self.period.is_zero() {
73 return Err(CamelError::InvalidUri(
74 "timer period must be greater than 0".to_string(),
75 ));
76 }
77 Ok(self)
78 }
79}
80
81pub struct TimerComponent;
87
88impl TimerComponent {
89 pub fn new() -> Self {
90 Self
91 }
92}
93
94impl Default for TimerComponent {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100impl Component for TimerComponent {
101 fn scheme(&self) -> &str {
102 "timer"
103 }
104
105 fn create_endpoint(
106 &self,
107 uri: &str,
108 _ctx: &dyn camel_component_api::ComponentContext,
109 ) -> Result<Box<dyn Endpoint>, CamelError> {
110 let config = TimerConfig::from_uri(uri)?;
111 Ok(Box::new(TimerEndpoint {
112 uri: uri.to_string(),
113 config,
114 }))
115 }
116}
117
118struct TimerEndpoint {
123 uri: String,
124 config: TimerConfig,
125}
126
127impl Endpoint for TimerEndpoint {
128 fn uri(&self) -> &str {
129 &self.uri
130 }
131
132 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
133 Ok(Box::new(TimerConsumer {
134 config: self.config.clone(),
135 }))
136 }
137
138 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
139 Err(CamelError::EndpointCreationFailed(
140 "timer endpoint does not support producers".to_string(),
141 ))
142 }
143}
144
145struct TimerConsumer {
150 config: TimerConfig,
151}
152
153#[async_trait]
154impl Consumer for TimerConsumer {
155 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
156 let config = self.config.clone();
157
158 if !config.delay.is_zero() {
160 tokio::select! {
161 _ = time::sleep(config.delay) => {}
162 _ = context.cancelled() => {
163 debug!(timer = config.name, "Timer cancelled during initial delay");
164 return Ok(());
165 }
166 }
167 }
168
169 let mut interval = time::interval(config.period);
170 let mut count: u32 = 0;
171
172 loop {
173 tokio::select! {
174 _ = context.cancelled() => {
175 debug!(timer = config.name, "Timer received cancellation, stopping");
176 break;
177 }
178 _ = interval.tick() => {
179 count += 1;
180
181 debug!(timer = config.name, count, "Timer tick");
182
183 let mut exchange = Exchange::new(Message::new(format!(
184 "timer://{} tick #{}",
185 config.name, count
186 )));
187 exchange.input.set_header(
188 "CamelTimerName",
189 serde_json::Value::String(config.name.clone()),
190 );
191 exchange
192 .input
193 .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
194
195 if context.send(exchange).await.is_err() {
196 break;
198 }
199
200 if let Some(max) = config.repeat_count
201 && count >= max
202 {
203 break;
204 }
205 }
206 }
207 }
208
209 Ok(())
210 }
211
212 async fn stop(&mut self) -> Result<(), CamelError> {
213 Ok(())
214 }
215}
216
217#[cfg(test)]
222mod tests {
223 use super::*;
224 use camel_component_api::NoOpComponentContext;
225
226 #[test]
227 fn test_zero_period_rejected() {
228 let result = TimerConfig::from_uri("timer:tick?period=0");
229 assert!(result.is_err(), "period=0 should be rejected");
230 let err_msg = result.unwrap_err().to_string();
231 assert!(err_msg.contains("period"), "error should mention 'period'");
232 }
233
234 #[test]
235 fn test_timer_empty_name_rejected() {
236 let result = TimerConfig::from_uri("timer:");
237 assert!(result.is_err());
238 let err = result.unwrap_err().to_string();
239 assert!(err.contains("must not be empty"), "unexpected error: {err}");
240 }
241
242 #[test]
243 fn test_timer_config_defaults() {
244 let config = TimerConfig::from_uri("timer:tick").unwrap();
245 assert_eq!(config.name, "tick");
246 assert_eq!(config.period, Duration::from_millis(1000));
247 assert_eq!(config.delay, Duration::from_millis(0));
248 assert_eq!(config.repeat_count, None);
249 }
250
251 #[test]
252 fn test_timer_config_with_params() {
253 let config =
254 TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
255 assert_eq!(config.name, "myTimer");
256 assert_eq!(config.period, Duration::from_millis(500));
257 assert_eq!(config.delay, Duration::from_millis(100));
258 assert_eq!(config.repeat_count, Some(5));
259 }
260
261 #[test]
262 fn test_timer_config_wrong_scheme() {
263 let result = TimerConfig::from_uri("log:info");
264 assert!(result.is_err());
265 }
266
267 #[test]
268 fn test_timer_component_scheme() {
269 let component = TimerComponent::new();
270 assert_eq!(component.scheme(), "timer");
271 }
272
273 #[test]
274 fn test_timer_component_creates_endpoint() {
275 let component = TimerComponent::new();
276 let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
277 assert!(endpoint.is_ok());
278 }
279
280 #[test]
281 fn test_timer_endpoint_no_producer() {
282 let ctx = ProducerContext::new();
283 let component = TimerComponent::new();
284 let endpoint = component
285 .create_endpoint("timer:tick", &NoOpComponentContext)
286 .unwrap();
287 let producer = endpoint.create_producer(&ctx);
288 assert!(producer.is_err());
289 }
290
291 #[tokio::test]
292 async fn test_timer_consumer_fires() {
293 let component = TimerComponent::new();
294 let endpoint = component
295 .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
296 .unwrap();
297 let mut consumer = endpoint.create_consumer().unwrap();
298
299 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
300 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
301
302 tokio::spawn(async move {
304 consumer.start(ctx).await.unwrap();
305 });
306
307 let mut received = Vec::new();
309 while let Some(envelope) = rx.recv().await {
310 received.push(envelope.exchange);
311 if received.len() == 3 {
312 break;
313 }
314 }
315
316 assert_eq!(received.len(), 3);
317
318 let first = &received[0];
320 assert_eq!(
321 first.input.header("CamelTimerName"),
322 Some(&serde_json::Value::String("test".into()))
323 );
324 assert_eq!(
325 first.input.header("CamelTimerCounter"),
326 Some(&serde_json::Value::Number(1.into()))
327 );
328 }
329
330 #[tokio::test]
331 async fn test_timer_consumer_respects_cancellation() {
332 use tokio_util::sync::CancellationToken;
333
334 let token = CancellationToken::new();
335 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
336 let ctx = ConsumerContext::new(tx, token.clone());
337
338 let mut consumer = TimerConsumer {
339 config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
340 };
341
342 let handle = tokio::spawn(async move {
343 consumer.start(ctx).await.unwrap();
344 });
345
346 tokio::time::sleep(Duration::from_millis(180)).await;
348 token.cancel();
349
350 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
351 assert!(
352 result.is_ok(),
353 "Consumer should have stopped after cancellation"
354 );
355
356 let mut count = 0;
357 while rx.try_recv().is_ok() {
358 count += 1;
359 }
360 assert!(
361 count >= 2,
362 "Expected at least 2 exchanges before cancellation, got {count}"
363 );
364 }
365}