1use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use async_trait::async_trait;
16use chrono::Utc;
17use tokio::time;
18use tracing::debug;
19
20use camel_component_api::UriConfig;
21use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
22use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
23
24#[derive(Debug, Clone, UriConfig)]
32#[uri_scheme = "timer"]
33#[uri_config(skip_impl, crate = "camel_component_api")]
34pub struct TimerConfig {
35 pub name: String,
37
38 #[allow(dead_code)] #[uri_param(name = "period", default = "1000")]
41 period_ms: u64,
42
43 pub period: Duration,
45
46 #[allow(dead_code)] #[uri_param(name = "delay", default = "0")]
49 delay_ms: u64,
50
51 pub delay: Duration,
53
54 #[uri_param(name = "repeatCount")]
56 pub repeat_count: Option<u32>,
57
58 #[uri_param(name = "fixedRate", default = "false")]
61 pub fixed_rate: bool,
62
63 #[uri_param(name = "includeMetadata", default = "true")]
67 pub include_metadata: bool,
68}
69
70impl TimerConfig {
72 pub fn validate(&self) -> Result<(), CamelError> {
74 if self.name.trim().is_empty() {
75 return Err(CamelError::InvalidUri(
76 "timer name must not be empty".to_string(),
77 ));
78 }
79 if self.period.is_zero() {
80 return Err(CamelError::InvalidUri(
81 "timer period must be greater than 0".to_string(),
82 ));
83 }
84 Ok(())
85 }
86}
87
88impl UriConfig for TimerConfig {
89 fn scheme() -> &'static str {
90 "timer"
91 }
92
93 fn from_uri(uri: &str) -> Result<Self, CamelError> {
94 let parts = camel_component_api::parse_uri(uri)?;
95 Self::from_components(parts)
96 }
97
98 fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
99 let config = Self::parse_uri_components(parts)?;
100 TimerConfig::validate(&config)?;
101 Ok(config)
102 }
103
104 fn validate(self) -> Result<Self, CamelError> {
105 TimerConfig::validate(&self)?;
107 Ok(self)
108 }
109}
110
111pub struct TimerComponent;
117
118impl TimerComponent {
119 pub fn new() -> Self {
120 Self
121 }
122}
123
124impl Default for TimerComponent {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130impl Component for TimerComponent {
131 fn scheme(&self) -> &str {
132 "timer"
133 }
134
135 fn create_endpoint(
136 &self,
137 uri: &str,
138 _ctx: &dyn camel_component_api::ComponentContext,
139 ) -> Result<Box<dyn Endpoint>, CamelError> {
140 let config = TimerConfig::from_uri(uri)?;
141 Ok(Box::new(TimerEndpoint {
142 uri: uri.to_string(),
143 config,
144 }))
145 }
146}
147
148pub struct TimerEndpoint {
153 uri: String,
154 config: TimerConfig,
155}
156
157impl Endpoint for TimerEndpoint {
158 fn uri(&self) -> &str {
159 &self.uri
160 }
161
162 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
163 Ok(Box::new(TimerConsumer {
164 config: self.config.clone(),
165 started: AtomicBool::new(false),
166 }))
167 }
168
169 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
170 Err(CamelError::EndpointCreationFailed(
171 "timer endpoint does not support producers".to_string(),
172 ))
173 }
174}
175
176pub struct TimerConsumer {
181 config: TimerConfig,
182 started: AtomicBool,
184}
185
186#[async_trait]
187impl Consumer for TimerConsumer {
188 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
189 self.started
191 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
192 .map_err(|_| {
193 CamelError::EndpointCreationFailed("timer consumer already started".to_string())
194 })?;
195
196 TimerConfig::validate(&self.config)?;
197 let config = self.config.clone();
198 let cancel_token = context.cancel_token();
199
200 if !config.delay.is_zero() {
202 tokio::select! {
203 _ = time::sleep(config.delay) => {}
204 _ = cancel_token.cancelled() => {
205 debug!(timer = config.name, "Timer cancelled during initial delay");
206 self.started.store(false, Ordering::SeqCst);
207 return Ok(());
208 }
209 }
210 }
211
212 if config.repeat_count == Some(0) {
214 debug!(timer = config.name, "repeat_count=0, timer will not fire");
215 self.started.store(false, Ordering::SeqCst);
216 return Ok(());
217 }
218
219 let mut interval = time::interval(config.period);
220
221 if config.fixed_rate {
223 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
224 } else {
225 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
226 }
227
228 let mut count: u32 = 0;
229
230 loop {
231 tokio::select! {
232 _ = cancel_token.cancelled() => {
233 debug!(timer = config.name, "Timer received cancellation, stopping");
234 break;
235 }
236 _ = interval.tick() => {
237 count += 1;
238
239 debug!(timer = config.name, count, "Timer tick");
240
241 let mut exchange = Exchange::new(Message::new(format!(
242 "timer://{} tick #{}",
243 config.name, count
244 )));
245
246 if config.include_metadata {
248 exchange.input.set_header(
249 "CamelTimerName",
250 serde_json::Value::String(config.name.clone()),
251 );
252 exchange
253 .input
254 .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
255
256 let now = Utc::now();
258 exchange.input.set_header(
259 "CamelTimerFiredTime",
260 serde_json::Value::String(now.to_rfc3339()),
261 );
262 exchange.input.set_header(
263 "CamelMessageTimestamp",
264 serde_json::Value::Number(
265 now.timestamp_millis().into(),
266 ),
267 );
268 }
269
270 if context.send(exchange).await.is_err() {
271 break;
273 }
274
275 if let Some(max) = config.repeat_count
276 && count >= max
277 {
278 break;
279 }
280 }
281 }
282 }
283
284 self.started.store(false, Ordering::SeqCst);
286 Ok(())
287 }
288
289 async fn stop(&mut self) -> Result<(), CamelError> {
290 self.started.store(false, Ordering::SeqCst);
291 debug!(timer = self.config.name, "timer consumer stopped");
292 Ok(())
293 }
294}
295
296impl TimerConsumer {
297 #[cfg(test)]
299 pub(crate) fn mark_started_for_test(&self) {
300 self.started.store(true, Ordering::SeqCst);
301 }
302}
303
304#[cfg(test)]
309mod tests {
310 use super::*;
311 use camel_component_api::NoOpComponentContext;
312
313 #[test]
314 fn test_zero_period_rejected() {
315 let result = TimerConfig::from_uri("timer:tick?period=0");
316 assert!(result.is_err(), "period=0 should be rejected");
317 let err_msg = result.unwrap_err().to_string();
318 assert!(err_msg.contains("period"), "error should mention 'period'");
319 }
320
321 #[test]
322 fn test_timer_empty_name_rejected() {
323 let result = TimerConfig::from_uri("timer:");
324 assert!(result.is_err());
325 let err = result.unwrap_err().to_string();
326 assert!(err.contains("must not be empty"), "unexpected error: {err}");
327 }
328
329 #[test]
330 fn test_timer_config_defaults() {
331 let config = TimerConfig::from_uri("timer:tick").unwrap();
332 assert_eq!(config.name, "tick");
333 assert_eq!(config.period, Duration::from_millis(1000));
334 assert_eq!(config.delay, Duration::from_millis(0));
335 assert_eq!(config.repeat_count, None);
336 }
337
338 #[test]
339 fn test_timer_config_with_params() {
340 let config =
341 TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
342 assert_eq!(config.name, "myTimer");
343 assert_eq!(config.period, Duration::from_millis(500));
344 assert_eq!(config.delay, Duration::from_millis(100));
345 assert_eq!(config.repeat_count, Some(5));
346 }
347
348 #[test]
349 fn test_timer_config_wrong_scheme() {
350 let result = TimerConfig::from_uri("log:info");
351 assert!(result.is_err());
352 }
353
354 #[test]
355 fn test_timer_component_scheme() {
356 let component = TimerComponent::new();
357 assert_eq!(component.scheme(), "timer");
358 }
359
360 #[test]
361 fn test_timer_component_creates_endpoint() {
362 let component = TimerComponent::new();
363 let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
364 assert!(endpoint.is_ok());
365 }
366
367 #[test]
368 fn test_timer_endpoint_no_producer() {
369 let ctx = ProducerContext::new();
370 let component = TimerComponent::new();
371 let endpoint = component
372 .create_endpoint("timer:tick", &NoOpComponentContext)
373 .unwrap();
374 let producer = endpoint.create_producer(&ctx);
375 assert!(producer.is_err());
376 }
377
378 #[test]
379 fn test_rejects_empty_timer_name() {
380 let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
381 cfg.name = "".into();
382 assert!(cfg.validate().is_err());
383 }
384
385 #[test]
386 fn test_rejects_zero_period() {
387 let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
388 cfg.period = Duration::ZERO;
389 assert!(cfg.validate().is_err());
390 }
391
392 #[test]
393 fn test_valid_config_passes() {
394 let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
395 cfg.name = "myTimer".into();
396 cfg.period = Duration::from_millis(1000);
397 assert!(cfg.validate().is_ok());
398 }
399
400 #[tokio::test]
401 async fn test_repeat_count_zero_fires_never() {
402 let component = TimerComponent::new();
403 let endpoint = component
404 .create_endpoint(
405 "timer:zero-test?period=50&repeatCount=0",
406 &NoOpComponentContext,
407 )
408 .unwrap();
409 let mut consumer = endpoint.create_consumer().unwrap();
410
411 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
412 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
413
414 consumer.start(ctx).await.unwrap();
416
417 tokio::time::sleep(Duration::from_millis(200)).await;
419
420 let mut count = 0;
422 while rx.try_recv().is_ok() {
423 count += 1;
424 }
425 assert_eq!(
426 count, 0,
427 "repeat_count=0 should produce zero fires, got {count}"
428 );
429
430 consumer.stop().await.unwrap();
432 }
433
434 #[tokio::test]
435 async fn test_timer_consumer_fires() {
436 let component = TimerComponent::new();
437 let endpoint = component
438 .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
439 .unwrap();
440 let mut consumer = endpoint.create_consumer().unwrap();
441
442 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
443 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
444
445 tokio::spawn(async move {
447 consumer.start(ctx).await.unwrap();
448 });
449
450 let mut received = Vec::new();
452 while let Some(envelope) = rx.recv().await {
453 received.push(envelope.exchange);
454 if received.len() == 3 {
455 break;
456 }
457 }
458
459 assert_eq!(received.len(), 3);
460
461 let first = &received[0];
463 assert_eq!(
464 first.input.header("CamelTimerName"),
465 Some(&serde_json::Value::String("test".into()))
466 );
467 assert_eq!(
468 first.input.header("CamelTimerCounter"),
469 Some(&serde_json::Value::Number(1.into()))
470 );
471 }
472
473 #[tokio::test]
474 async fn test_timer_consumer_respects_cancellation() {
475 use tokio_util::sync::CancellationToken;
476
477 let token = CancellationToken::new();
478 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
479 let ctx = ConsumerContext::new(tx, token.clone());
480
481 let mut consumer = TimerConsumer {
482 config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
483 started: AtomicBool::new(false),
484 };
485
486 let handle = tokio::spawn(async move {
487 consumer.start(ctx).await.unwrap();
488 });
489
490 tokio::time::sleep(Duration::from_millis(180)).await;
492 token.cancel();
493
494 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
495 assert!(
496 result.is_ok(),
497 "Consumer should have stopped after cancellation"
498 );
499
500 let mut count = 0;
501 while rx.try_recv().is_ok() {
502 count += 1;
503 }
504 assert!(
505 count >= 2,
506 "Expected at least 2 exchanges before cancellation, got {count}"
507 );
508 }
509
510 #[tokio::test]
511 async fn test_timer_consumer_stop_shuts_down() {
512 let component = TimerComponent::new();
513 let endpoint = component
514 .create_endpoint("timer:stop-test?period=50", &NoOpComponentContext)
515 .unwrap();
516 let mut consumer = endpoint.create_consumer().unwrap();
517
518 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
519 let token = tokio_util::sync::CancellationToken::new();
520 let ctx = ConsumerContext::new(tx, token.clone());
521
522 tokio::spawn(async move {
524 consumer.start(ctx).await.unwrap();
525 });
526
527 tokio::time::sleep(Duration::from_millis(180)).await;
529
530 let mut count = 0;
532 while rx.try_recv().is_ok() {
533 count += 1;
534 }
535 assert!(count >= 2, "Expected at least 2 exchanges, got {count}");
536
537 token.cancel();
539 }
540
541 #[test]
543 fn test_fixed_rate_default_is_false() {
544 let config = TimerConfig::from_uri("timer:tick").unwrap();
545 assert!(!config.fixed_rate, "fixedRate should default to false");
546 }
547
548 #[test]
549 fn test_fixed_rate_parsed_from_uri() {
550 let config = TimerConfig::from_uri("timer:tick?fixedRate=true").unwrap();
551 assert!(
552 config.fixed_rate,
553 "fixedRate should be true when set in URI"
554 );
555 }
556
557 #[tokio::test]
559 async fn test_double_start_returns_error() {
560 let component = TimerComponent::new();
561 let endpoint = component
562 .create_endpoint(
563 "timer:double?period=50&repeatCount=2",
564 &NoOpComponentContext,
565 )
566 .unwrap(); let mut consumer = TimerConsumer {
569 config: TimerConfig {
570 name: "double-test".to_string(),
571 period: Duration::from_millis(100),
572 period_ms: 100,
573 delay: Duration::ZERO,
574 delay_ms: 0,
575 repeat_count: None,
576 fixed_rate: false,
577 include_metadata: true,
578 },
579 started: AtomicBool::new(false),
580 };
581
582 consumer.mark_started_for_test();
584
585 let (tx, _rx) = tokio::sync::mpsc::channel(16);
586 let cancel_token = tokio_util::sync::CancellationToken::new();
587 let ctx = ConsumerContext::new(tx, cancel_token.clone());
588
589 let result = consumer.start(ctx).await;
591 assert!(result.is_err(), "expected double-start to return Err");
592 let err_str = format!("{:?}", result.unwrap_err());
593 assert!(
594 err_str.contains("already started"),
595 "unexpected error: {err_str}"
596 );
597
598 drop(endpoint); }
600
601 #[tokio::test]
603 async fn test_timer_fired_time_and_message_timestamp_headers() {
604 let component = TimerComponent::new();
605 let endpoint = component
606 .create_endpoint(
607 "timer:headers?period=50&repeatCount=1",
608 &NoOpComponentContext,
609 )
610 .unwrap();
611 let mut consumer = endpoint.create_consumer().unwrap();
612
613 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
614 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
615
616 tokio::spawn(async move {
617 consumer.start(ctx).await.unwrap();
618 });
619
620 let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
621 .await
622 .expect("should receive exchange")
623 .expect("envelope should exist");
624
625 let exchange = envelope.exchange;
626
627 let fired_time = exchange
629 .input
630 .header("CamelTimerFiredTime")
631 .expect("CamelTimerFiredTime header should be present");
632 assert!(
633 fired_time.is_string(),
634 "CamelTimerFiredTime should be a string"
635 );
636 let fired_str = fired_time.as_str().unwrap();
637 assert!(
639 chrono::DateTime::parse_from_rfc3339(fired_str).is_ok(),
640 "CamelTimerFiredTime should be valid RFC 3339: {fired_str}"
641 );
642
643 let msg_ts = exchange
645 .input
646 .header("CamelMessageTimestamp")
647 .expect("CamelMessageTimestamp header should be present");
648 assert!(
649 msg_ts.is_number(),
650 "CamelMessageTimestamp should be a number"
651 );
652 let ts_millis = msg_ts.as_i64().expect("should be i64");
653 assert!(ts_millis > 0, "timestamp should be positive");
654 }
655
656 #[test]
657 fn test_timer_fired_time_header_format() {
658 let now = chrono::Utc::now();
660 let rfc = now.to_rfc3339();
661 assert!(chrono::DateTime::parse_from_rfc3339(&rfc).is_ok());
662 let millis = now.timestamp_millis();
663 assert!(millis > 0);
664 }
665
666 #[test]
668 fn test_include_metadata_default_is_true() {
669 let config = TimerConfig::from_uri("timer:tick").unwrap();
670 assert!(
671 config.include_metadata,
672 "includeMetadata should default to true"
673 );
674 }
675
676 #[test]
677 fn test_include_metadata_false_from_uri() {
678 let config = TimerConfig::from_uri("timer:tick?includeMetadata=false").unwrap();
679 assert!(
680 !config.include_metadata,
681 "includeMetadata should be false when set in URI"
682 );
683 }
684
685 #[tokio::test]
686 async fn test_include_metadata_false_omits_headers() {
687 let component = TimerComponent::new();
688 let endpoint = component
689 .create_endpoint(
690 "timer:minimal?period=50&repeatCount=1&includeMetadata=false",
691 &NoOpComponentContext,
692 )
693 .unwrap();
694 let mut consumer = endpoint.create_consumer().unwrap();
695
696 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
697 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
698
699 tokio::spawn(async move {
700 consumer.start(ctx).await.unwrap();
701 });
702
703 let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
704 .await
705 .expect("should receive exchange")
706 .expect("envelope should exist");
707
708 let exchange = envelope.exchange;
709
710 assert!(
712 exchange.input.header("CamelTimerName").is_none(),
713 "CamelTimerName should not be present when includeMetadata=false"
714 );
715 assert!(
716 exchange.input.header("CamelTimerCounter").is_none(),
717 "CamelTimerCounter should not be present when includeMetadata=false"
718 );
719 assert!(
720 exchange.input.header("CamelTimerFiredTime").is_none(),
721 "CamelTimerFiredTime should not be present when includeMetadata=false"
722 );
723 assert!(
724 exchange.input.header("CamelMessageTimestamp").is_none(),
725 "CamelMessageTimestamp should not be present when includeMetadata=false"
726 );
727 }
728
729 #[tokio::test]
730 async fn test_include_metadata_true_includes_all_headers() {
731 let component = TimerComponent::new();
732 let endpoint = component
733 .create_endpoint(
734 "timer:full?period=50&repeatCount=1&includeMetadata=true",
735 &NoOpComponentContext,
736 )
737 .unwrap();
738 let mut consumer = endpoint.create_consumer().unwrap();
739
740 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
741 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
742
743 tokio::spawn(async move {
744 consumer.start(ctx).await.unwrap();
745 });
746
747 let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
748 .await
749 .expect("should receive exchange")
750 .expect("envelope should exist");
751
752 let exchange = envelope.exchange;
753
754 assert!(exchange.input.header("CamelTimerName").is_some());
755 assert!(exchange.input.header("CamelTimerCounter").is_some());
756 assert!(exchange.input.header("CamelTimerFiredTime").is_some());
757 assert!(exchange.input.header("CamelMessageTimestamp").is_some());
758 }
759
760 #[test]
762 fn test_timer_endpoint_is_pub() {
763 let component = TimerComponent::new();
764 let endpoint = component
765 .create_endpoint("timer:pub-test", &NoOpComponentContext)
766 .unwrap();
767 assert_eq!(endpoint.uri(), "timer:pub-test");
768 }
769}