1use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use async_trait::async_trait;
16use chrono::Utc;
17use tokio::time;
18use tracing::debug;
19
20use camel_component_api::UriConfig;
21use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
22use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
23
24#[derive(Debug, Clone, UriConfig)]
32#[uri_scheme = "timer"]
33#[uri_config(skip_impl, crate = "camel_component_api")]
34pub struct TimerConfig {
35 pub name: String,
37
38 #[allow(dead_code)] #[uri_param(name = "period", default = "1000")]
41 period_ms: u64,
42
43 pub period: Duration,
45
46 #[allow(dead_code)] #[uri_param(name = "delay", default = "0")]
49 delay_ms: u64,
50
51 pub delay: Duration,
53
54 #[uri_param(name = "repeatCount")]
56 pub repeat_count: Option<u32>,
57
58 #[uri_param(name = "fixedRate", default = "false")]
61 pub fixed_rate: bool,
62
63 #[uri_param(name = "includeMetadata", default = "true")]
67 pub include_metadata: bool,
68}
69
70impl TimerConfig {
72 pub fn validate(&self) -> Result<(), CamelError> {
74 if self.name.trim().is_empty() {
75 return Err(CamelError::InvalidUri(
76 "timer name must not be empty".to_string(),
77 ));
78 }
79 if self.period.is_zero() {
80 return Err(CamelError::InvalidUri(
81 "timer period must be greater than 0".to_string(),
82 ));
83 }
84 Ok(())
85 }
86}
87
88impl UriConfig for TimerConfig {
89 fn scheme() -> &'static str {
90 "timer"
91 }
92
93 fn from_uri(uri: &str) -> Result<Self, CamelError> {
94 let parts = camel_component_api::parse_uri(uri)?;
95 Self::from_components(parts)
96 }
97
98 fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
99 let config = Self::parse_uri_components(parts)?;
100 TimerConfig::validate(&config)?;
101 Ok(config)
102 }
103
104 fn validate(self) -> Result<Self, CamelError> {
105 TimerConfig::validate(&self)?;
107 Ok(self)
108 }
109}
110
111pub struct TimerComponent;
117
118impl TimerComponent {
119 pub fn new() -> Self {
120 Self
121 }
122}
123
124impl Default for TimerComponent {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130impl Component for TimerComponent {
131 fn scheme(&self) -> &str {
132 "timer"
133 }
134
135 fn create_endpoint(
136 &self,
137 uri: &str,
138 _ctx: &dyn camel_component_api::ComponentContext,
139 ) -> Result<Box<dyn Endpoint>, CamelError> {
140 let config = TimerConfig::from_uri(uri)?;
141 Ok(Box::new(TimerEndpoint {
142 uri: uri.to_string(),
143 config,
144 }))
145 }
146}
147
148pub struct TimerEndpoint {
153 uri: String,
154 config: TimerConfig,
155}
156
157impl Endpoint for TimerEndpoint {
158 fn uri(&self) -> &str {
159 &self.uri
160 }
161
162 fn create_consumer(
163 &self,
164 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
165 ) -> Result<Box<dyn Consumer>, CamelError> {
166 Ok(Box::new(TimerConsumer {
167 config: self.config.clone(),
168 started: AtomicBool::new(false),
169 }))
170 }
171
172 fn create_producer(
173 &self,
174 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
175 _ctx: &ProducerContext,
176 ) -> Result<BoxProcessor, CamelError> {
177 Err(CamelError::EndpointCreationFailed(
178 "timer endpoint does not support producers".to_string(),
179 ))
180 }
181}
182
183pub struct TimerConsumer {
188 config: TimerConfig,
189 started: AtomicBool,
191}
192
193#[async_trait]
194impl Consumer for TimerConsumer {
195 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
196 self.started
198 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
199 .map_err(|_| {
200 CamelError::EndpointCreationFailed("timer consumer already started".to_string())
201 })?;
202
203 TimerConfig::validate(&self.config)?;
204 let config = self.config.clone();
205 let cancel_token = context.cancel_token();
206
207 if !config.delay.is_zero() {
209 tokio::select! {
210 _ = time::sleep(config.delay) => {}
211 _ = cancel_token.cancelled() => {
212 debug!(timer = config.name, "Timer cancelled during initial delay");
213 self.started.store(false, Ordering::SeqCst);
214 return Ok(());
215 }
216 }
217 }
218
219 if config.repeat_count == Some(0) {
221 debug!(timer = config.name, "repeat_count=0, timer will not fire");
222 self.started.store(false, Ordering::SeqCst);
223 return Ok(());
224 }
225
226 let mut interval = time::interval(config.period);
227
228 if config.fixed_rate {
230 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
231 } else {
232 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
233 }
234
235 let mut count: u32 = 0;
236
237 loop {
238 tokio::select! {
239 _ = cancel_token.cancelled() => {
240 debug!(timer = config.name, "Timer received cancellation, stopping");
241 break;
242 }
243 _ = interval.tick() => {
244 count += 1;
245
246 debug!(timer = config.name, count, "Timer tick");
247
248 let mut exchange = Exchange::new(Message::new(format!(
249 "timer://{} tick #{}",
250 config.name, count
251 )));
252
253 if config.include_metadata {
255 exchange.input.set_header(
256 "CamelTimerName",
257 serde_json::Value::String(config.name.clone()),
258 );
259 exchange
260 .input
261 .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
262
263 let now = Utc::now();
265 exchange.input.set_header(
266 "CamelTimerFiredTime",
267 serde_json::Value::String(now.to_rfc3339()),
268 );
269 exchange.input.set_header(
270 "CamelMessageTimestamp",
271 serde_json::Value::Number(
272 now.timestamp_millis().into(),
273 ),
274 );
275 }
276
277 if context.send(exchange).await.is_err() {
278 break;
280 }
281
282 if let Some(max) = config.repeat_count
283 && count >= max
284 {
285 break;
286 }
287 }
288 }
289 }
290
291 self.started.store(false, Ordering::SeqCst);
293 Ok(())
294 }
295
296 async fn stop(&mut self) -> Result<(), CamelError> {
297 self.started.store(false, Ordering::SeqCst);
298 debug!(timer = self.config.name, "timer consumer stopped");
299 Ok(())
300 }
301}
302
303impl TimerConsumer {
304 #[cfg(test)]
306 pub(crate) fn mark_started_for_test(&self) {
307 self.started.store(true, Ordering::SeqCst);
308 }
309}
310
311#[cfg(test)]
316mod tests {
317 use camel_component_api::test_support::PanicRuntimeObservability;
318 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
319 std::sync::Arc::new(PanicRuntimeObservability)
320 }
321
322 use super::*;
323 use camel_component_api::NoOpComponentContext;
324
325 #[test]
326 fn test_zero_period_rejected() {
327 let result = TimerConfig::from_uri("timer:tick?period=0");
328 assert!(result.is_err(), "period=0 should be rejected");
329 let err_msg = result.unwrap_err().to_string();
330 assert!(err_msg.contains("period"), "error should mention 'period'");
331 }
332
333 #[test]
334 fn test_timer_empty_name_rejected() {
335 let result = TimerConfig::from_uri("timer:");
336 assert!(result.is_err());
337 let err = result.unwrap_err().to_string();
338 assert!(err.contains("must not be empty"), "unexpected error: {err}");
339 }
340
341 #[test]
342 fn test_timer_config_defaults() {
343 let config = TimerConfig::from_uri("timer:tick").unwrap();
344 assert_eq!(config.name, "tick");
345 assert_eq!(config.period, Duration::from_millis(1000));
346 assert_eq!(config.delay, Duration::from_millis(0));
347 assert_eq!(config.repeat_count, None);
348 }
349
350 #[test]
351 fn test_timer_config_with_params() {
352 let config =
353 TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
354 assert_eq!(config.name, "myTimer");
355 assert_eq!(config.period, Duration::from_millis(500));
356 assert_eq!(config.delay, Duration::from_millis(100));
357 assert_eq!(config.repeat_count, Some(5));
358 }
359
360 #[test]
361 fn test_timer_config_wrong_scheme() {
362 let result = TimerConfig::from_uri("log:info");
363 assert!(result.is_err());
364 }
365
366 #[test]
367 fn test_timer_component_scheme() {
368 let component = TimerComponent::new();
369 assert_eq!(component.scheme(), "timer");
370 }
371
372 #[test]
373 fn test_timer_component_creates_endpoint() {
374 let component = TimerComponent::new();
375 let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
376 assert!(endpoint.is_ok());
377 }
378
379 #[test]
380 fn test_timer_endpoint_no_producer() {
381 let ctx = ProducerContext::new();
382 let component = TimerComponent::new();
383 let endpoint = component
384 .create_endpoint("timer:tick", &NoOpComponentContext)
385 .unwrap();
386 let producer = endpoint.create_producer(rt(), &ctx);
387 assert!(producer.is_err());
388 }
389
390 #[test]
391 fn test_rejects_empty_timer_name() {
392 let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
393 cfg.name = "".into();
394 assert!(cfg.validate().is_err());
395 }
396
397 #[test]
398 fn test_rejects_zero_period() {
399 let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
400 cfg.period = Duration::ZERO;
401 assert!(cfg.validate().is_err());
402 }
403
404 #[test]
405 fn test_valid_config_passes() {
406 let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
407 cfg.name = "myTimer".into();
408 cfg.period = Duration::from_millis(1000);
409 assert!(cfg.validate().is_ok());
410 }
411
412 #[tokio::test]
413 async fn test_repeat_count_zero_fires_never() {
414 let component = TimerComponent::new();
415 let endpoint = component
416 .create_endpoint(
417 "timer:zero-test?period=50&repeatCount=0",
418 &NoOpComponentContext,
419 )
420 .unwrap();
421 let mut consumer = endpoint.create_consumer(rt()).unwrap();
422
423 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
424 let ctx = ConsumerContext::new(
425 tx,
426 tokio_util::sync::CancellationToken::new(),
427 "timer-test-route".to_string(),
428 );
429
430 consumer.start(ctx).await.unwrap();
432
433 tokio::time::sleep(Duration::from_millis(200)).await;
435
436 let mut count = 0;
438 while rx.try_recv().is_ok() {
439 count += 1;
440 }
441 assert_eq!(
442 count, 0,
443 "repeat_count=0 should produce zero fires, got {count}"
444 );
445
446 consumer.stop().await.unwrap();
448 }
449
450 #[tokio::test]
451 async fn test_timer_consumer_fires() {
452 let component = TimerComponent::new();
453 let endpoint = component
454 .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
455 .unwrap();
456 let mut consumer = endpoint.create_consumer(rt()).unwrap();
457
458 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
459 let ctx = ConsumerContext::new(
460 tx,
461 tokio_util::sync::CancellationToken::new(),
462 "timer-test-route".to_string(),
463 );
464
465 tokio::spawn(async move {
467 consumer.start(ctx).await.unwrap();
468 });
469
470 let mut received = Vec::new();
472 while let Some(envelope) = rx.recv().await {
473 received.push(envelope.exchange);
474 if received.len() == 3 {
475 break;
476 }
477 }
478
479 assert_eq!(received.len(), 3);
480
481 let first = &received[0];
483 assert_eq!(
484 first.input.header("CamelTimerName"),
485 Some(&serde_json::Value::String("test".into()))
486 );
487 assert_eq!(
488 first.input.header("CamelTimerCounter"),
489 Some(&serde_json::Value::Number(1.into()))
490 );
491 }
492
493 #[tokio::test]
494 async fn test_timer_consumer_respects_cancellation() {
495 use tokio_util::sync::CancellationToken;
496
497 let token = CancellationToken::new();
498 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
499 let ctx = ConsumerContext::new(tx, token.clone(), "timer-test-route".to_string());
500
501 let mut consumer = TimerConsumer {
502 config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
503 started: AtomicBool::new(false),
504 };
505
506 let handle = tokio::spawn(async move {
507 consumer.start(ctx).await.unwrap();
508 });
509
510 tokio::time::sleep(Duration::from_millis(180)).await;
512 token.cancel();
513
514 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
515 assert!(
516 result.is_ok(),
517 "Consumer should have stopped after cancellation"
518 );
519
520 let mut count = 0;
521 while rx.try_recv().is_ok() {
522 count += 1;
523 }
524 assert!(
525 count >= 2,
526 "Expected at least 2 exchanges before cancellation, got {count}"
527 );
528 }
529
530 #[tokio::test]
531 async fn test_timer_consumer_stop_shuts_down() {
532 let component = TimerComponent::new();
533 let endpoint = component
534 .create_endpoint("timer:stop-test?period=50", &NoOpComponentContext)
535 .unwrap();
536 let mut consumer = endpoint.create_consumer(rt()).unwrap();
537
538 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
539 let token = tokio_util::sync::CancellationToken::new();
540 let ctx = ConsumerContext::new(tx, token.clone(), "timer-test-route".to_string());
541
542 tokio::spawn(async move {
544 consumer.start(ctx).await.unwrap();
545 });
546
547 tokio::time::sleep(Duration::from_millis(180)).await;
549
550 let mut count = 0;
552 while rx.try_recv().is_ok() {
553 count += 1;
554 }
555 assert!(count >= 2, "Expected at least 2 exchanges, got {count}");
556
557 token.cancel();
559 }
560
561 #[test]
563 fn test_fixed_rate_default_is_false() {
564 let config = TimerConfig::from_uri("timer:tick").unwrap();
565 assert!(!config.fixed_rate, "fixedRate should default to false");
566 }
567
568 #[test]
569 fn test_fixed_rate_parsed_from_uri() {
570 let config = TimerConfig::from_uri("timer:tick?fixedRate=true").unwrap();
571 assert!(
572 config.fixed_rate,
573 "fixedRate should be true when set in URI"
574 );
575 }
576
577 #[tokio::test]
579 async fn test_double_start_returns_error() {
580 let component = TimerComponent::new();
581 let endpoint = component
582 .create_endpoint(
583 "timer:double?period=50&repeatCount=2",
584 &NoOpComponentContext,
585 )
586 .unwrap(); let mut consumer = TimerConsumer {
589 config: TimerConfig {
590 name: "double-test".to_string(),
591 period: Duration::from_millis(100),
592 period_ms: 100,
593 delay: Duration::ZERO,
594 delay_ms: 0,
595 repeat_count: None,
596 fixed_rate: false,
597 include_metadata: true,
598 },
599 started: AtomicBool::new(false),
600 };
601
602 consumer.mark_started_for_test();
604
605 let (tx, _rx) = tokio::sync::mpsc::channel(16);
606 let cancel_token = tokio_util::sync::CancellationToken::new();
607 let ctx = ConsumerContext::new(tx, cancel_token.clone(), "timer-test-route".to_string());
608
609 let result = consumer.start(ctx).await;
611 assert!(result.is_err(), "expected double-start to return Err");
612 let err_str = format!("{:?}", result.unwrap_err());
613 assert!(
614 err_str.contains("already started"),
615 "unexpected error: {err_str}"
616 );
617
618 drop(endpoint); }
620
621 #[tokio::test]
623 async fn test_timer_fired_time_and_message_timestamp_headers() {
624 let component = TimerComponent::new();
625 let endpoint = component
626 .create_endpoint(
627 "timer:headers?period=50&repeatCount=1",
628 &NoOpComponentContext,
629 )
630 .unwrap();
631 let mut consumer = endpoint.create_consumer(rt()).unwrap();
632
633 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
634 let ctx = ConsumerContext::new(
635 tx,
636 tokio_util::sync::CancellationToken::new(),
637 "timer-test-route".to_string(),
638 );
639
640 tokio::spawn(async move {
641 consumer.start(ctx).await.unwrap();
642 });
643
644 let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
645 .await
646 .expect("should receive exchange")
647 .expect("envelope should exist");
648
649 let exchange = envelope.exchange;
650
651 let fired_time = exchange
653 .input
654 .header("CamelTimerFiredTime")
655 .expect("CamelTimerFiredTime header should be present");
656 assert!(
657 fired_time.is_string(),
658 "CamelTimerFiredTime should be a string"
659 );
660 let fired_str = fired_time.as_str().unwrap();
661 assert!(
663 chrono::DateTime::parse_from_rfc3339(fired_str).is_ok(),
664 "CamelTimerFiredTime should be valid RFC 3339: {fired_str}"
665 );
666
667 let msg_ts = exchange
669 .input
670 .header("CamelMessageTimestamp")
671 .expect("CamelMessageTimestamp header should be present");
672 assert!(
673 msg_ts.is_number(),
674 "CamelMessageTimestamp should be a number"
675 );
676 let ts_millis = msg_ts.as_i64().expect("should be i64");
677 assert!(ts_millis > 0, "timestamp should be positive");
678 }
679
680 #[test]
681 fn test_timer_fired_time_header_format() {
682 let now = chrono::Utc::now();
684 let rfc = now.to_rfc3339();
685 assert!(chrono::DateTime::parse_from_rfc3339(&rfc).is_ok());
686 let millis = now.timestamp_millis();
687 assert!(millis > 0);
688 }
689
690 #[test]
692 fn test_include_metadata_default_is_true() {
693 let config = TimerConfig::from_uri("timer:tick").unwrap();
694 assert!(
695 config.include_metadata,
696 "includeMetadata should default to true"
697 );
698 }
699
700 #[test]
701 fn test_include_metadata_false_from_uri() {
702 let config = TimerConfig::from_uri("timer:tick?includeMetadata=false").unwrap();
703 assert!(
704 !config.include_metadata,
705 "includeMetadata should be false when set in URI"
706 );
707 }
708
709 #[tokio::test]
710 async fn test_include_metadata_false_omits_headers() {
711 let component = TimerComponent::new();
712 let endpoint = component
713 .create_endpoint(
714 "timer:minimal?period=50&repeatCount=1&includeMetadata=false",
715 &NoOpComponentContext,
716 )
717 .unwrap();
718 let mut consumer = endpoint.create_consumer(rt()).unwrap();
719
720 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
721 let ctx = ConsumerContext::new(
722 tx,
723 tokio_util::sync::CancellationToken::new(),
724 "timer-test-route".to_string(),
725 );
726
727 tokio::spawn(async move {
728 consumer.start(ctx).await.unwrap();
729 });
730
731 let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
732 .await
733 .expect("should receive exchange")
734 .expect("envelope should exist");
735
736 let exchange = envelope.exchange;
737
738 assert!(
740 exchange.input.header("CamelTimerName").is_none(),
741 "CamelTimerName should not be present when includeMetadata=false"
742 );
743 assert!(
744 exchange.input.header("CamelTimerCounter").is_none(),
745 "CamelTimerCounter should not be present when includeMetadata=false"
746 );
747 assert!(
748 exchange.input.header("CamelTimerFiredTime").is_none(),
749 "CamelTimerFiredTime should not be present when includeMetadata=false"
750 );
751 assert!(
752 exchange.input.header("CamelMessageTimestamp").is_none(),
753 "CamelMessageTimestamp should not be present when includeMetadata=false"
754 );
755 }
756
757 #[tokio::test]
758 async fn test_include_metadata_true_includes_all_headers() {
759 let component = TimerComponent::new();
760 let endpoint = component
761 .create_endpoint(
762 "timer:full?period=50&repeatCount=1&includeMetadata=true",
763 &NoOpComponentContext,
764 )
765 .unwrap();
766 let mut consumer = endpoint.create_consumer(rt()).unwrap();
767
768 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
769 let ctx = ConsumerContext::new(
770 tx,
771 tokio_util::sync::CancellationToken::new(),
772 "timer-test-route".to_string(),
773 );
774
775 tokio::spawn(async move {
776 consumer.start(ctx).await.unwrap();
777 });
778
779 let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
780 .await
781 .expect("should receive exchange")
782 .expect("envelope should exist");
783
784 let exchange = envelope.exchange;
785
786 assert!(exchange.input.header("CamelTimerName").is_some());
787 assert!(exchange.input.header("CamelTimerCounter").is_some());
788 assert!(exchange.input.header("CamelTimerFiredTime").is_some());
789 assert!(exchange.input.header("CamelMessageTimestamp").is_some());
790 }
791
792 #[test]
794 fn test_timer_endpoint_is_pub() {
795 let component = TimerComponent::new();
796 let endpoint = component
797 .create_endpoint("timer:pub-test", &NoOpComponentContext)
798 .unwrap();
799 assert_eq!(endpoint.uri(), "timer:pub-test");
800 }
801}