1use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use async_trait::async_trait;
16use chrono::Utc;
17use tokio::time;
18use tracing::debug;
19
20use camel_component_api::UriConfig;
21use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
22use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
23
24#[derive(Debug, Clone, UriConfig)]
32#[uri_scheme = "timer"]
33#[uri_config(skip_impl, crate = "camel_component_api")]
34pub struct TimerConfig {
35 pub name: String,
37
38 #[allow(dead_code)] #[uri_param(name = "period", default = "1000")]
41 period_ms: u64,
42
43 pub period: Duration,
45
46 #[allow(dead_code)] #[uri_param(name = "delay", default = "0")]
49 delay_ms: u64,
50
51 pub delay: Duration,
53
54 #[uri_param(name = "repeatCount")]
56 pub repeat_count: Option<u32>,
57
58 #[uri_param(name = "fixedRate", default = "false")]
61 pub fixed_rate: bool,
62
63 #[uri_param(name = "includeMetadata", default = "true")]
67 pub include_metadata: bool,
68}
69
70impl TimerConfig {
72 pub fn validate(&self) -> Result<(), CamelError> {
74 if self.name.trim().is_empty() {
75 return Err(CamelError::InvalidUri(
76 "timer name must not be empty".to_string(),
77 ));
78 }
79 if self.period.is_zero() {
80 return Err(CamelError::InvalidUri(
81 "timer period must be greater than 0".to_string(),
82 ));
83 }
84 Ok(())
85 }
86}
87
88impl UriConfig for TimerConfig {
89 fn scheme() -> &'static str {
90 "timer"
91 }
92
93 fn from_uri(uri: &str) -> Result<Self, CamelError> {
94 let parts = camel_component_api::parse_uri(uri)?;
95 Self::from_components(parts)
96 }
97
98 fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
99 let config = Self::parse_uri_components(parts)?;
100 TimerConfig::validate(&config)?;
101 Ok(config)
102 }
103
104 fn validate(self) -> Result<Self, CamelError> {
105 TimerConfig::validate(&self)?;
107 Ok(self)
108 }
109}
110
111pub struct TimerComponent;
117
118impl TimerComponent {
119 pub fn new() -> Self {
120 Self
121 }
122}
123
124impl Default for TimerComponent {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130impl Component for TimerComponent {
131 fn scheme(&self) -> &str {
132 "timer"
133 }
134
135 fn create_endpoint(
136 &self,
137 uri: &str,
138 _ctx: &dyn camel_component_api::ComponentContext,
139 ) -> Result<Box<dyn Endpoint>, CamelError> {
140 let config = TimerConfig::from_uri(uri)?;
141 Ok(Box::new(TimerEndpoint {
142 uri: uri.to_string(),
143 config,
144 }))
145 }
146}
147
148pub struct TimerEndpoint {
153 uri: String,
154 config: TimerConfig,
155}
156
157impl Endpoint for TimerEndpoint {
158 fn uri(&self) -> &str {
159 &self.uri
160 }
161
162 fn create_consumer(
163 &self,
164 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
165 ) -> Result<Box<dyn Consumer>, CamelError> {
166 Ok(Box::new(TimerConsumer {
167 config: self.config.clone(),
168 started: AtomicBool::new(false),
169 }))
170 }
171
172 fn create_producer(
173 &self,
174 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
175 _ctx: &ProducerContext,
176 ) -> Result<BoxProcessor, CamelError> {
177 Err(CamelError::EndpointCreationFailed(
178 "timer endpoint does not support producers".to_string(),
179 ))
180 }
181}
182
183pub struct TimerConsumer {
188 config: TimerConfig,
189 started: AtomicBool,
191}
192
193#[async_trait]
194impl Consumer for TimerConsumer {
195 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
196 self.started
198 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
199 .map_err(|_| {
200 CamelError::EndpointCreationFailed("timer consumer already started".to_string())
201 })?;
202
203 TimerConfig::validate(&self.config)?;
204 let config = self.config.clone();
205 let cancel_token = context.cancel_token();
206
207 if !config.delay.is_zero() {
209 tokio::select! {
210 _ = time::sleep(config.delay) => {}
211 _ = cancel_token.cancelled() => {
212 debug!(timer = config.name, "Timer cancelled during initial delay");
213 self.started.store(false, Ordering::SeqCst);
214 return Ok(());
215 }
216 }
217 }
218
219 if config.repeat_count == Some(0) {
221 debug!(timer = config.name, "repeat_count=0, timer will not fire");
222 self.started.store(false, Ordering::SeqCst);
223 return Ok(());
224 }
225
226 let mut interval = time::interval(config.period);
227
228 if config.fixed_rate {
230 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
231 } else {
232 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
233 }
234
235 let mut count: u32 = 0;
236
237 loop {
238 tokio::select! {
239 _ = cancel_token.cancelled() => {
240 debug!(timer = config.name, "Timer received cancellation, stopping");
241 break;
242 }
243 _ = interval.tick() => {
244 count += 1;
245
246 debug!(timer = config.name, count, "Timer tick");
247
248 let mut exchange = Exchange::new(Message::new(format!(
249 "timer://{} tick #{}",
250 config.name, count
251 )));
252
253 if config.include_metadata {
255 exchange.input.set_header(
256 "CamelTimerName",
257 serde_json::Value::String(config.name.clone()),
258 );
259 exchange
260 .input
261 .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
262
263 let now = Utc::now();
265 exchange.input.set_header(
266 "CamelTimerFiredTime",
267 serde_json::Value::String(now.to_rfc3339()),
268 );
269 exchange.input.set_header(
270 "CamelMessageTimestamp",
271 serde_json::Value::Number(
272 now.timestamp_millis().into(),
273 ),
274 );
275 }
276
277 if context.send(exchange).await.is_err() {
278 break;
280 }
281
282 if let Some(max) = config.repeat_count
283 && count >= max
284 {
285 break;
286 }
287 }
288 }
289 }
290
291 self.started.store(false, Ordering::SeqCst);
293 Ok(())
294 }
295
296 async fn stop(&mut self) -> Result<(), CamelError> {
297 self.started.store(false, Ordering::SeqCst);
298 debug!(timer = self.config.name, "timer consumer stopped");
299 Ok(())
300 }
301}
302
303impl TimerConsumer {
304 #[cfg(test)]
306 pub(crate) fn mark_started_for_test(&self) {
307 self.started.store(true, Ordering::SeqCst);
308 }
309}
310
311#[cfg(test)]
316mod tests {
317 use camel_component_api::test_support::PanicRuntimeObservability;
318 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
319 std::sync::Arc::new(PanicRuntimeObservability)
320 }
321
322 use super::*;
323 use camel_component_api::NoOpComponentContext;
324
325 #[test]
326 fn test_zero_period_rejected() {
327 let result = TimerConfig::from_uri("timer:tick?period=0");
328 assert!(result.is_err(), "period=0 should be rejected");
329 let err_msg = result.unwrap_err().to_string();
330 assert!(err_msg.contains("period"), "error should mention 'period'");
331 }
332
333 #[test]
334 fn test_timer_empty_name_rejected() {
335 let result = TimerConfig::from_uri("timer:");
336 assert!(result.is_err());
337 let err = result.unwrap_err().to_string();
338 assert!(err.contains("must not be empty"), "unexpected error: {err}");
339 }
340
341 #[test]
342 fn test_timer_config_defaults() {
343 let config = TimerConfig::from_uri("timer:tick").unwrap();
344 assert_eq!(config.name, "tick");
345 assert_eq!(config.period, Duration::from_millis(1000));
346 assert_eq!(config.delay, Duration::from_millis(0));
347 assert_eq!(config.repeat_count, None);
348 }
349
350 #[test]
351 fn test_timer_config_with_params() {
352 let config =
353 TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
354 assert_eq!(config.name, "myTimer");
355 assert_eq!(config.period, Duration::from_millis(500));
356 assert_eq!(config.delay, Duration::from_millis(100));
357 assert_eq!(config.repeat_count, Some(5));
358 }
359
360 #[test]
361 fn test_timer_config_wrong_scheme() {
362 let result = TimerConfig::from_uri("log:info");
363 assert!(result.is_err());
364 }
365
366 #[test]
367 fn test_timer_component_scheme() {
368 let component = TimerComponent::new();
369 assert_eq!(component.scheme(), "timer");
370 }
371
372 #[test]
373 fn test_timer_component_creates_endpoint() {
374 let component = TimerComponent::new();
375 let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
376 assert!(endpoint.is_ok());
377 }
378
379 #[test]
380 fn test_timer_endpoint_no_producer() {
381 let ctx = ProducerContext::new();
382 let component = TimerComponent::new();
383 let endpoint = component
384 .create_endpoint("timer:tick", &NoOpComponentContext)
385 .unwrap();
386 let producer = endpoint.create_producer(rt(), &ctx);
387 assert!(producer.is_err());
388 }
389
390 #[test]
391 fn test_rejects_empty_timer_name() {
392 let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
393 cfg.name = "".into();
394 assert!(cfg.validate().is_err());
395 }
396
397 #[test]
398 fn test_rejects_zero_period() {
399 let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
400 cfg.period = Duration::ZERO;
401 assert!(cfg.validate().is_err());
402 }
403
404 #[test]
405 fn test_valid_config_passes() {
406 let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
407 cfg.name = "myTimer".into();
408 cfg.period = Duration::from_millis(1000);
409 assert!(cfg.validate().is_ok());
410 }
411
412 #[tokio::test]
413 async fn test_repeat_count_zero_fires_never() {
414 let component = TimerComponent::new();
415 let endpoint = component
416 .create_endpoint(
417 "timer:zero-test?period=50&repeatCount=0",
418 &NoOpComponentContext,
419 )
420 .unwrap();
421 let mut consumer = endpoint.create_consumer(rt()).unwrap();
422
423 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
424 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
425
426 consumer.start(ctx).await.unwrap();
428
429 tokio::time::sleep(Duration::from_millis(200)).await;
431
432 let mut count = 0;
434 while rx.try_recv().is_ok() {
435 count += 1;
436 }
437 assert_eq!(
438 count, 0,
439 "repeat_count=0 should produce zero fires, got {count}"
440 );
441
442 consumer.stop().await.unwrap();
444 }
445
446 #[tokio::test]
447 async fn test_timer_consumer_fires() {
448 let component = TimerComponent::new();
449 let endpoint = component
450 .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
451 .unwrap();
452 let mut consumer = endpoint.create_consumer(rt()).unwrap();
453
454 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
455 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
456
457 tokio::spawn(async move {
459 consumer.start(ctx).await.unwrap();
460 });
461
462 let mut received = Vec::new();
464 while let Some(envelope) = rx.recv().await {
465 received.push(envelope.exchange);
466 if received.len() == 3 {
467 break;
468 }
469 }
470
471 assert_eq!(received.len(), 3);
472
473 let first = &received[0];
475 assert_eq!(
476 first.input.header("CamelTimerName"),
477 Some(&serde_json::Value::String("test".into()))
478 );
479 assert_eq!(
480 first.input.header("CamelTimerCounter"),
481 Some(&serde_json::Value::Number(1.into()))
482 );
483 }
484
485 #[tokio::test]
486 async fn test_timer_consumer_respects_cancellation() {
487 use tokio_util::sync::CancellationToken;
488
489 let token = CancellationToken::new();
490 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
491 let ctx = ConsumerContext::new(tx, token.clone());
492
493 let mut consumer = TimerConsumer {
494 config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
495 started: AtomicBool::new(false),
496 };
497
498 let handle = tokio::spawn(async move {
499 consumer.start(ctx).await.unwrap();
500 });
501
502 tokio::time::sleep(Duration::from_millis(180)).await;
504 token.cancel();
505
506 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
507 assert!(
508 result.is_ok(),
509 "Consumer should have stopped after cancellation"
510 );
511
512 let mut count = 0;
513 while rx.try_recv().is_ok() {
514 count += 1;
515 }
516 assert!(
517 count >= 2,
518 "Expected at least 2 exchanges before cancellation, got {count}"
519 );
520 }
521
522 #[tokio::test]
523 async fn test_timer_consumer_stop_shuts_down() {
524 let component = TimerComponent::new();
525 let endpoint = component
526 .create_endpoint("timer:stop-test?period=50", &NoOpComponentContext)
527 .unwrap();
528 let mut consumer = endpoint.create_consumer(rt()).unwrap();
529
530 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
531 let token = tokio_util::sync::CancellationToken::new();
532 let ctx = ConsumerContext::new(tx, token.clone());
533
534 tokio::spawn(async move {
536 consumer.start(ctx).await.unwrap();
537 });
538
539 tokio::time::sleep(Duration::from_millis(180)).await;
541
542 let mut count = 0;
544 while rx.try_recv().is_ok() {
545 count += 1;
546 }
547 assert!(count >= 2, "Expected at least 2 exchanges, got {count}");
548
549 token.cancel();
551 }
552
553 #[test]
555 fn test_fixed_rate_default_is_false() {
556 let config = TimerConfig::from_uri("timer:tick").unwrap();
557 assert!(!config.fixed_rate, "fixedRate should default to false");
558 }
559
560 #[test]
561 fn test_fixed_rate_parsed_from_uri() {
562 let config = TimerConfig::from_uri("timer:tick?fixedRate=true").unwrap();
563 assert!(
564 config.fixed_rate,
565 "fixedRate should be true when set in URI"
566 );
567 }
568
569 #[tokio::test]
571 async fn test_double_start_returns_error() {
572 let component = TimerComponent::new();
573 let endpoint = component
574 .create_endpoint(
575 "timer:double?period=50&repeatCount=2",
576 &NoOpComponentContext,
577 )
578 .unwrap(); let mut consumer = TimerConsumer {
581 config: TimerConfig {
582 name: "double-test".to_string(),
583 period: Duration::from_millis(100),
584 period_ms: 100,
585 delay: Duration::ZERO,
586 delay_ms: 0,
587 repeat_count: None,
588 fixed_rate: false,
589 include_metadata: true,
590 },
591 started: AtomicBool::new(false),
592 };
593
594 consumer.mark_started_for_test();
596
597 let (tx, _rx) = tokio::sync::mpsc::channel(16);
598 let cancel_token = tokio_util::sync::CancellationToken::new();
599 let ctx = ConsumerContext::new(tx, cancel_token.clone());
600
601 let result = consumer.start(ctx).await;
603 assert!(result.is_err(), "expected double-start to return Err");
604 let err_str = format!("{:?}", result.unwrap_err());
605 assert!(
606 err_str.contains("already started"),
607 "unexpected error: {err_str}"
608 );
609
610 drop(endpoint); }
612
613 #[tokio::test]
615 async fn test_timer_fired_time_and_message_timestamp_headers() {
616 let component = TimerComponent::new();
617 let endpoint = component
618 .create_endpoint(
619 "timer:headers?period=50&repeatCount=1",
620 &NoOpComponentContext,
621 )
622 .unwrap();
623 let mut consumer = endpoint.create_consumer(rt()).unwrap();
624
625 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
626 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
627
628 tokio::spawn(async move {
629 consumer.start(ctx).await.unwrap();
630 });
631
632 let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
633 .await
634 .expect("should receive exchange")
635 .expect("envelope should exist");
636
637 let exchange = envelope.exchange;
638
639 let fired_time = exchange
641 .input
642 .header("CamelTimerFiredTime")
643 .expect("CamelTimerFiredTime header should be present");
644 assert!(
645 fired_time.is_string(),
646 "CamelTimerFiredTime should be a string"
647 );
648 let fired_str = fired_time.as_str().unwrap();
649 assert!(
651 chrono::DateTime::parse_from_rfc3339(fired_str).is_ok(),
652 "CamelTimerFiredTime should be valid RFC 3339: {fired_str}"
653 );
654
655 let msg_ts = exchange
657 .input
658 .header("CamelMessageTimestamp")
659 .expect("CamelMessageTimestamp header should be present");
660 assert!(
661 msg_ts.is_number(),
662 "CamelMessageTimestamp should be a number"
663 );
664 let ts_millis = msg_ts.as_i64().expect("should be i64");
665 assert!(ts_millis > 0, "timestamp should be positive");
666 }
667
668 #[test]
669 fn test_timer_fired_time_header_format() {
670 let now = chrono::Utc::now();
672 let rfc = now.to_rfc3339();
673 assert!(chrono::DateTime::parse_from_rfc3339(&rfc).is_ok());
674 let millis = now.timestamp_millis();
675 assert!(millis > 0);
676 }
677
678 #[test]
680 fn test_include_metadata_default_is_true() {
681 let config = TimerConfig::from_uri("timer:tick").unwrap();
682 assert!(
683 config.include_metadata,
684 "includeMetadata should default to true"
685 );
686 }
687
688 #[test]
689 fn test_include_metadata_false_from_uri() {
690 let config = TimerConfig::from_uri("timer:tick?includeMetadata=false").unwrap();
691 assert!(
692 !config.include_metadata,
693 "includeMetadata should be false when set in URI"
694 );
695 }
696
697 #[tokio::test]
698 async fn test_include_metadata_false_omits_headers() {
699 let component = TimerComponent::new();
700 let endpoint = component
701 .create_endpoint(
702 "timer:minimal?period=50&repeatCount=1&includeMetadata=false",
703 &NoOpComponentContext,
704 )
705 .unwrap();
706 let mut consumer = endpoint.create_consumer(rt()).unwrap();
707
708 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
709 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
710
711 tokio::spawn(async move {
712 consumer.start(ctx).await.unwrap();
713 });
714
715 let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
716 .await
717 .expect("should receive exchange")
718 .expect("envelope should exist");
719
720 let exchange = envelope.exchange;
721
722 assert!(
724 exchange.input.header("CamelTimerName").is_none(),
725 "CamelTimerName should not be present when includeMetadata=false"
726 );
727 assert!(
728 exchange.input.header("CamelTimerCounter").is_none(),
729 "CamelTimerCounter should not be present when includeMetadata=false"
730 );
731 assert!(
732 exchange.input.header("CamelTimerFiredTime").is_none(),
733 "CamelTimerFiredTime should not be present when includeMetadata=false"
734 );
735 assert!(
736 exchange.input.header("CamelMessageTimestamp").is_none(),
737 "CamelMessageTimestamp should not be present when includeMetadata=false"
738 );
739 }
740
741 #[tokio::test]
742 async fn test_include_metadata_true_includes_all_headers() {
743 let component = TimerComponent::new();
744 let endpoint = component
745 .create_endpoint(
746 "timer:full?period=50&repeatCount=1&includeMetadata=true",
747 &NoOpComponentContext,
748 )
749 .unwrap();
750 let mut consumer = endpoint.create_consumer(rt()).unwrap();
751
752 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
753 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
754
755 tokio::spawn(async move {
756 consumer.start(ctx).await.unwrap();
757 });
758
759 let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
760 .await
761 .expect("should receive exchange")
762 .expect("envelope should exist");
763
764 let exchange = envelope.exchange;
765
766 assert!(exchange.input.header("CamelTimerName").is_some());
767 assert!(exchange.input.header("CamelTimerCounter").is_some());
768 assert!(exchange.input.header("CamelTimerFiredTime").is_some());
769 assert!(exchange.input.header("CamelMessageTimestamp").is_some());
770 }
771
772 #[test]
774 fn test_timer_endpoint_is_pub() {
775 let component = TimerComponent::new();
776 let endpoint = component
777 .create_endpoint("timer:pub-test", &NoOpComponentContext)
778 .unwrap();
779 assert_eq!(endpoint.uri(), "timer:pub-test");
780 }
781}