Skip to main content

camel_component_timer/
lib.rs

1//! Timer component for rust-camel — fires `Exchange` events on configurable period, delay, and repeat-count schedules.
2//!
3//! Main types: `TimerComponent`, `TimerConsumer`, `TimerConfig`, `TimerEndpoint`.
4//! URI format: `timer:name?period=1000&delay=0&repeatCount=0`.
5//!
6//! # Features
7//!
8//! - **fixedRate**: When enabled, uses skip-missed-tick semantics instead of burst.
9//! - **includeMetadata**: Controls whether timer metadata headers are included in exchanges.
10//! - Double-start protection via `AtomicBool` guard.
11
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use async_trait::async_trait;
16use chrono::Utc;
17use tokio::time;
18use tracing::debug;
19
20use camel_component_api::UriConfig;
21use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
22use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
23
24// ---------------------------------------------------------------------------
25// TimerConfig
26// ---------------------------------------------------------------------------
27
28/// Configuration parsed from a timer URI.
29///
30/// Format: `timer:name?period=1000&delay=0&repeatCount=0&fixedRate=false&includeMetadata=true`
31#[derive(Debug, Clone, UriConfig)]
32#[uri_scheme = "timer"]
33#[uri_config(skip_impl, crate = "camel_component_api")]
34pub struct TimerConfig {
35    /// Timer name (the path portion of the URI).
36    pub name: String,
37
38    /// Interval between ticks (milliseconds). Default: 1000.
39    #[allow(dead_code)] // Used by macro-generated Duration conversion
40    #[uri_param(name = "period", default = "1000")]
41    period_ms: u64,
42
43    /// Converted Duration for period.
44    pub period: Duration,
45
46    /// Initial delay before the first tick (milliseconds). Default: 0.
47    #[allow(dead_code)] // Used by macro-generated Duration conversion
48    #[uri_param(name = "delay", default = "0")]
49    delay_ms: u64,
50
51    /// Converted Duration for delay.
52    pub delay: Duration,
53
54    /// Maximum number of ticks. `None` means infinite.
55    #[uri_param(name = "repeatCount")]
56    pub repeat_count: Option<u32>,
57
58    /// When true, use fixed-rate semantics (skip missed ticks).
59    /// When false (default), use burst semantics (fire all missed ticks immediately).
60    #[uri_param(name = "fixedRate", default = "false")]
61    pub fixed_rate: bool,
62
63    /// When true (default), include metadata headers (CamelTimerFiredTime,
64    /// CamelMessageTimestamp, CamelTimerName) in each exchange.
65    /// When false, send a minimal exchange without metadata headers.
66    #[uri_param(name = "includeMetadata", default = "true")]
67    pub include_metadata: bool,
68}
69
70// Inherent validate — callable as TimerConfig::validate(&self)
71impl TimerConfig {
72    /// Validate the configuration without consuming self.
73    pub fn validate(&self) -> Result<(), CamelError> {
74        if self.name.trim().is_empty() {
75            return Err(CamelError::InvalidUri(
76                "timer name must not be empty".to_string(),
77            ));
78        }
79        if self.period.is_zero() {
80            return Err(CamelError::InvalidUri(
81                "timer period must be greater than 0".to_string(),
82            ));
83        }
84        Ok(())
85    }
86}
87
88impl UriConfig for TimerConfig {
89    fn scheme() -> &'static str {
90        "timer"
91    }
92
93    fn from_uri(uri: &str) -> Result<Self, CamelError> {
94        let parts = camel_component_api::parse_uri(uri)?;
95        Self::from_components(parts)
96    }
97
98    fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
99        let config = Self::parse_uri_components(parts)?;
100        TimerConfig::validate(&config)?;
101        Ok(config)
102    }
103
104    fn validate(self) -> Result<Self, CamelError> {
105        // Delegate to the inherent validate(&self)
106        TimerConfig::validate(&self)?;
107        Ok(self)
108    }
109}
110
111// ---------------------------------------------------------------------------
112// TimerComponent
113// ---------------------------------------------------------------------------
114
115/// The Timer component produces exchanges on a periodic interval.
116pub struct TimerComponent;
117
118impl TimerComponent {
119    pub fn new() -> Self {
120        Self
121    }
122}
123
124impl Default for TimerComponent {
125    fn default() -> Self {
126        Self::new()
127    }
128}
129
130impl Component for TimerComponent {
131    fn scheme(&self) -> &str {
132        "timer"
133    }
134
135    fn create_endpoint(
136        &self,
137        uri: &str,
138        _ctx: &dyn camel_component_api::ComponentContext,
139    ) -> Result<Box<dyn Endpoint>, CamelError> {
140        let config = TimerConfig::from_uri(uri)?;
141        Ok(Box::new(TimerEndpoint {
142            uri: uri.to_string(),
143            config,
144        }))
145    }
146}
147
148// ---------------------------------------------------------------------------
149// TimerEndpoint
150// ---------------------------------------------------------------------------
151
152pub struct TimerEndpoint {
153    uri: String,
154    config: TimerConfig,
155}
156
157impl Endpoint for TimerEndpoint {
158    fn uri(&self) -> &str {
159        &self.uri
160    }
161
162    fn create_consumer(
163        &self,
164        _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
165    ) -> Result<Box<dyn Consumer>, CamelError> {
166        Ok(Box::new(TimerConsumer {
167            config: self.config.clone(),
168            started: AtomicBool::new(false),
169        }))
170    }
171
172    fn create_producer(
173        &self,
174        _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
175        _ctx: &ProducerContext,
176    ) -> Result<BoxProcessor, CamelError> {
177        Err(CamelError::EndpointCreationFailed(
178            "timer endpoint does not support producers".to_string(),
179        ))
180    }
181}
182
183// ---------------------------------------------------------------------------
184// TimerConsumer
185// ---------------------------------------------------------------------------
186
187pub struct TimerConsumer {
188    config: TimerConfig,
189    /// Guard against double-start (TIMER-003).
190    started: AtomicBool,
191}
192
193#[async_trait]
194impl Consumer for TimerConsumer {
195    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
196        // TIMER-003: Guard against double-start
197        self.started
198            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
199            .map_err(|_| {
200                CamelError::EndpointCreationFailed("timer consumer already started".to_string())
201            })?;
202
203        TimerConfig::validate(&self.config)?;
204        let config = self.config.clone();
205        let cancel_token = context.cancel_token();
206
207        // Initial delay (cancellable so shutdown isn't blocked by long delays)
208        if !config.delay.is_zero() {
209            tokio::select! {
210                _ = time::sleep(config.delay) => {}
211                _ = cancel_token.cancelled() => {
212                    debug!(timer = config.name, "Timer cancelled during initial delay");
213                    self.started.store(false, Ordering::SeqCst);
214                    return Ok(());
215                }
216            }
217        }
218
219        // If repeat_count is explicitly 0, fire zero times — stop immediately.
220        if config.repeat_count == Some(0) {
221            debug!(timer = config.name, "repeat_count=0, timer will not fire");
222            self.started.store(false, Ordering::SeqCst);
223            return Ok(());
224        }
225
226        let mut interval = time::interval(config.period);
227
228        // TIMER-002: fixedRate controls missed-tick behavior
229        if config.fixed_rate {
230            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
231        } else {
232            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
233        }
234
235        let mut count: u32 = 0;
236
237        loop {
238            tokio::select! {
239                _ = cancel_token.cancelled() => {
240                    debug!(timer = config.name, "Timer received cancellation, stopping");
241                    break;
242                }
243                _ = interval.tick() => {
244                    count += 1;
245
246                    debug!(timer = config.name, count, "Timer tick");
247
248                    let mut exchange = Exchange::new(Message::new(format!(
249                        "timer://{} tick #{}",
250                        config.name, count
251                    )));
252
253                    // TIMER-005 & TIMER-006: include metadata headers when enabled
254                    if config.include_metadata {
255                        exchange.input.set_header(
256                            "CamelTimerName",
257                            serde_json::Value::String(config.name.clone()),
258                        );
259                        exchange
260                            .input
261                            .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
262
263                        // TIMER-005: CamelTimerFiredTime (ISO-8601) and CamelMessageTimestamp (epoch millis)
264                        let now = Utc::now();
265                        exchange.input.set_header(
266                            "CamelTimerFiredTime",
267                            serde_json::Value::String(now.to_rfc3339()),
268                        );
269                        exchange.input.set_header(
270                            "CamelMessageTimestamp",
271                            serde_json::Value::Number(
272                                now.timestamp_millis().into(),
273                            ),
274                        );
275                    }
276
277                    if context.send(exchange).await.is_err() {
278                        // Channel closed, route was stopped
279                        break;
280                    }
281
282                    if let Some(max) = config.repeat_count
283                        && count >= max
284                    {
285                        break;
286                    }
287                }
288            }
289        }
290
291        // Reset started flag so consumer can be restarted after stop
292        self.started.store(false, Ordering::SeqCst);
293        Ok(())
294    }
295
296    async fn stop(&mut self) -> Result<(), CamelError> {
297        self.started.store(false, Ordering::SeqCst);
298        debug!(timer = self.config.name, "timer consumer stopped");
299        Ok(())
300    }
301}
302
303impl TimerConsumer {
304    /// Test helper: pre-set the started flag to simulate an already-running consumer.
305    #[cfg(test)]
306    pub(crate) fn mark_started_for_test(&self) {
307        self.started.store(true, Ordering::SeqCst);
308    }
309}
310
311// ---------------------------------------------------------------------------
312// Tests
313// ---------------------------------------------------------------------------
314
315#[cfg(test)]
316mod tests {
317    use camel_component_api::test_support::PanicRuntimeObservability;
318    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
319        std::sync::Arc::new(PanicRuntimeObservability)
320    }
321
322    use super::*;
323    use camel_component_api::NoOpComponentContext;
324
325    #[test]
326    fn test_zero_period_rejected() {
327        let result = TimerConfig::from_uri("timer:tick?period=0");
328        assert!(result.is_err(), "period=0 should be rejected");
329        let err_msg = result.unwrap_err().to_string();
330        assert!(err_msg.contains("period"), "error should mention 'period'");
331    }
332
333    #[test]
334    fn test_timer_empty_name_rejected() {
335        let result = TimerConfig::from_uri("timer:");
336        assert!(result.is_err());
337        let err = result.unwrap_err().to_string();
338        assert!(err.contains("must not be empty"), "unexpected error: {err}");
339    }
340
341    #[test]
342    fn test_timer_config_defaults() {
343        let config = TimerConfig::from_uri("timer:tick").unwrap();
344        assert_eq!(config.name, "tick");
345        assert_eq!(config.period, Duration::from_millis(1000));
346        assert_eq!(config.delay, Duration::from_millis(0));
347        assert_eq!(config.repeat_count, None);
348    }
349
350    #[test]
351    fn test_timer_config_with_params() {
352        let config =
353            TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
354        assert_eq!(config.name, "myTimer");
355        assert_eq!(config.period, Duration::from_millis(500));
356        assert_eq!(config.delay, Duration::from_millis(100));
357        assert_eq!(config.repeat_count, Some(5));
358    }
359
360    #[test]
361    fn test_timer_config_wrong_scheme() {
362        let result = TimerConfig::from_uri("log:info");
363        assert!(result.is_err());
364    }
365
366    #[test]
367    fn test_timer_component_scheme() {
368        let component = TimerComponent::new();
369        assert_eq!(component.scheme(), "timer");
370    }
371
372    #[test]
373    fn test_timer_component_creates_endpoint() {
374        let component = TimerComponent::new();
375        let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
376        assert!(endpoint.is_ok());
377    }
378
379    #[test]
380    fn test_timer_endpoint_no_producer() {
381        let ctx = ProducerContext::new();
382        let component = TimerComponent::new();
383        let endpoint = component
384            .create_endpoint("timer:tick", &NoOpComponentContext)
385            .unwrap();
386        let producer = endpoint.create_producer(rt(), &ctx);
387        assert!(producer.is_err());
388    }
389
390    #[test]
391    fn test_rejects_empty_timer_name() {
392        let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
393        cfg.name = "".into();
394        assert!(cfg.validate().is_err());
395    }
396
397    #[test]
398    fn test_rejects_zero_period() {
399        let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
400        cfg.period = Duration::ZERO;
401        assert!(cfg.validate().is_err());
402    }
403
404    #[test]
405    fn test_valid_config_passes() {
406        let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
407        cfg.name = "myTimer".into();
408        cfg.period = Duration::from_millis(1000);
409        assert!(cfg.validate().is_ok());
410    }
411
412    #[tokio::test]
413    async fn test_repeat_count_zero_fires_never() {
414        let component = TimerComponent::new();
415        let endpoint = component
416            .create_endpoint(
417                "timer:zero-test?period=50&repeatCount=0",
418                &NoOpComponentContext,
419            )
420            .unwrap();
421        let mut consumer = endpoint.create_consumer(rt()).unwrap();
422
423        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
424        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
425
426        // Start the consumer (spawns internally, returns immediately)
427        consumer.start(ctx).await.unwrap();
428
429        // Wait longer than the period — no messages should arrive
430        tokio::time::sleep(Duration::from_millis(200)).await;
431
432        // Drain any pending messages
433        let mut count = 0;
434        while rx.try_recv().is_ok() {
435            count += 1;
436        }
437        assert_eq!(
438            count, 0,
439            "repeat_count=0 should produce zero fires, got {count}"
440        );
441
442        // Clean up
443        consumer.stop().await.unwrap();
444    }
445
446    #[tokio::test]
447    async fn test_timer_consumer_fires() {
448        let component = TimerComponent::new();
449        let endpoint = component
450            .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
451            .unwrap();
452        let mut consumer = endpoint.create_consumer(rt()).unwrap();
453
454        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
455        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
456
457        // Run consumer in background
458        tokio::spawn(async move {
459            consumer.start(ctx).await.unwrap();
460        });
461
462        // Collect exchanges
463        let mut received = Vec::new();
464        while let Some(envelope) = rx.recv().await {
465            received.push(envelope.exchange);
466            if received.len() == 3 {
467                break;
468            }
469        }
470
471        assert_eq!(received.len(), 3);
472
473        // Verify headers on the first exchange
474        let first = &received[0];
475        assert_eq!(
476            first.input.header("CamelTimerName"),
477            Some(&serde_json::Value::String("test".into()))
478        );
479        assert_eq!(
480            first.input.header("CamelTimerCounter"),
481            Some(&serde_json::Value::Number(1.into()))
482        );
483    }
484
485    #[tokio::test]
486    async fn test_timer_consumer_respects_cancellation() {
487        use tokio_util::sync::CancellationToken;
488
489        let token = CancellationToken::new();
490        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
491        let ctx = ConsumerContext::new(tx, token.clone());
492
493        let mut consumer = TimerConsumer {
494            config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
495            started: AtomicBool::new(false),
496        };
497
498        let handle = tokio::spawn(async move {
499            consumer.start(ctx).await.unwrap();
500        });
501
502        // Let it fire a few times
503        tokio::time::sleep(Duration::from_millis(180)).await;
504        token.cancel();
505
506        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
507        assert!(
508            result.is_ok(),
509            "Consumer should have stopped after cancellation"
510        );
511
512        let mut count = 0;
513        while rx.try_recv().is_ok() {
514            count += 1;
515        }
516        assert!(
517            count >= 2,
518            "Expected at least 2 exchanges before cancellation, got {count}"
519        );
520    }
521
522    #[tokio::test]
523    async fn test_timer_consumer_stop_shuts_down() {
524        let component = TimerComponent::new();
525        let endpoint = component
526            .create_endpoint("timer:stop-test?period=50", &NoOpComponentContext)
527            .unwrap();
528        let mut consumer = endpoint.create_consumer(rt()).unwrap();
529
530        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
531        let token = tokio_util::sync::CancellationToken::new();
532        let ctx = ConsumerContext::new(tx, token.clone());
533
534        // Run consumer in background (start() blocks until cancelled)
535        tokio::spawn(async move {
536            consumer.start(ctx).await.unwrap();
537        });
538
539        // Let it fire a few times
540        tokio::time::sleep(Duration::from_millis(180)).await;
541
542        // Drain any pending exchanges
543        let mut count = 0;
544        while rx.try_recv().is_ok() {
545            count += 1;
546        }
547        assert!(count >= 2, "Expected at least 2 exchanges, got {count}");
548
549        // Cancel the token to stop the consumer
550        token.cancel();
551    }
552
553    // TIMER-002: fixedRate config round-trip
554    #[test]
555    fn test_fixed_rate_default_is_false() {
556        let config = TimerConfig::from_uri("timer:tick").unwrap();
557        assert!(!config.fixed_rate, "fixedRate should default to false");
558    }
559
560    #[test]
561    fn test_fixed_rate_parsed_from_uri() {
562        let config = TimerConfig::from_uri("timer:tick?fixedRate=true").unwrap();
563        assert!(
564            config.fixed_rate,
565            "fixedRate should be true when set in URI"
566        );
567    }
568
569    // TIMER-003: double-start guard
570    #[tokio::test]
571    async fn test_double_start_returns_error() {
572        let component = TimerComponent::new();
573        let endpoint = component
574            .create_endpoint(
575                "timer:double?period=50&repeatCount=2",
576                &NoOpComponentContext,
577            )
578            .unwrap(); // allow-unwrap: test setup
579
580        let mut consumer = TimerConsumer {
581            config: TimerConfig {
582                name: "double-test".to_string(),
583                period: Duration::from_millis(100),
584                period_ms: 100,
585                delay: Duration::ZERO,
586                delay_ms: 0,
587                repeat_count: None,
588                fixed_rate: false,
589                include_metadata: true,
590            },
591            started: AtomicBool::new(false),
592        };
593
594        // Simulate the consumer already being started by setting the flag.
595        consumer.mark_started_for_test();
596
597        let (tx, _rx) = tokio::sync::mpsc::channel(16);
598        let cancel_token = tokio_util::sync::CancellationToken::new();
599        let ctx = ConsumerContext::new(tx, cancel_token.clone());
600
601        // Second start on an already-started consumer must return an error.
602        let result = consumer.start(ctx).await;
603        assert!(result.is_err(), "expected double-start to return Err");
604        let err_str = format!("{:?}", result.unwrap_err());
605        assert!(
606            err_str.contains("already started"),
607            "unexpected error: {err_str}"
608        );
609
610        drop(endpoint); // suppress unused-variable warning
611    }
612
613    // TIMER-005: CamelTimerFiredTime and CamelMessageTimestamp headers
614    #[tokio::test]
615    async fn test_timer_fired_time_and_message_timestamp_headers() {
616        let component = TimerComponent::new();
617        let endpoint = component
618            .create_endpoint(
619                "timer:headers?period=50&repeatCount=1",
620                &NoOpComponentContext,
621            )
622            .unwrap();
623        let mut consumer = endpoint.create_consumer(rt()).unwrap();
624
625        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
626        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
627
628        tokio::spawn(async move {
629            consumer.start(ctx).await.unwrap();
630        });
631
632        let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
633            .await
634            .expect("should receive exchange")
635            .expect("envelope should exist");
636
637        let exchange = envelope.exchange;
638
639        // CamelTimerFiredTime should be an ISO-8601 string
640        let fired_time = exchange
641            .input
642            .header("CamelTimerFiredTime")
643            .expect("CamelTimerFiredTime header should be present");
644        assert!(
645            fired_time.is_string(),
646            "CamelTimerFiredTime should be a string"
647        );
648        let fired_str = fired_time.as_str().unwrap();
649        // Should parse as ISO-8601 / RFC 3339
650        assert!(
651            chrono::DateTime::parse_from_rfc3339(fired_str).is_ok(),
652            "CamelTimerFiredTime should be valid RFC 3339: {fired_str}"
653        );
654
655        // CamelMessageTimestamp should be a number (epoch millis)
656        let msg_ts = exchange
657            .input
658            .header("CamelMessageTimestamp")
659            .expect("CamelMessageTimestamp header should be present");
660        assert!(
661            msg_ts.is_number(),
662            "CamelMessageTimestamp should be a number"
663        );
664        let ts_millis = msg_ts.as_i64().expect("should be i64");
665        assert!(ts_millis > 0, "timestamp should be positive");
666    }
667
668    #[test]
669    fn test_timer_fired_time_header_format() {
670        // Verify the format independently
671        let now = chrono::Utc::now();
672        let rfc = now.to_rfc3339();
673        assert!(chrono::DateTime::parse_from_rfc3339(&rfc).is_ok());
674        let millis = now.timestamp_millis();
675        assert!(millis > 0);
676    }
677
678    // TIMER-006: includeMetadata option
679    #[test]
680    fn test_include_metadata_default_is_true() {
681        let config = TimerConfig::from_uri("timer:tick").unwrap();
682        assert!(
683            config.include_metadata,
684            "includeMetadata should default to true"
685        );
686    }
687
688    #[test]
689    fn test_include_metadata_false_from_uri() {
690        let config = TimerConfig::from_uri("timer:tick?includeMetadata=false").unwrap();
691        assert!(
692            !config.include_metadata,
693            "includeMetadata should be false when set in URI"
694        );
695    }
696
697    #[tokio::test]
698    async fn test_include_metadata_false_omits_headers() {
699        let component = TimerComponent::new();
700        let endpoint = component
701            .create_endpoint(
702                "timer:minimal?period=50&repeatCount=1&includeMetadata=false",
703                &NoOpComponentContext,
704            )
705            .unwrap();
706        let mut consumer = endpoint.create_consumer(rt()).unwrap();
707
708        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
709        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
710
711        tokio::spawn(async move {
712            consumer.start(ctx).await.unwrap();
713        });
714
715        let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
716            .await
717            .expect("should receive exchange")
718            .expect("envelope should exist");
719
720        let exchange = envelope.exchange;
721
722        // No metadata headers should be present
723        assert!(
724            exchange.input.header("CamelTimerName").is_none(),
725            "CamelTimerName should not be present when includeMetadata=false"
726        );
727        assert!(
728            exchange.input.header("CamelTimerCounter").is_none(),
729            "CamelTimerCounter should not be present when includeMetadata=false"
730        );
731        assert!(
732            exchange.input.header("CamelTimerFiredTime").is_none(),
733            "CamelTimerFiredTime should not be present when includeMetadata=false"
734        );
735        assert!(
736            exchange.input.header("CamelMessageTimestamp").is_none(),
737            "CamelMessageTimestamp should not be present when includeMetadata=false"
738        );
739    }
740
741    #[tokio::test]
742    async fn test_include_metadata_true_includes_all_headers() {
743        let component = TimerComponent::new();
744        let endpoint = component
745            .create_endpoint(
746                "timer:full?period=50&repeatCount=1&includeMetadata=true",
747                &NoOpComponentContext,
748            )
749            .unwrap();
750        let mut consumer = endpoint.create_consumer(rt()).unwrap();
751
752        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
753        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
754
755        tokio::spawn(async move {
756            consumer.start(ctx).await.unwrap();
757        });
758
759        let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
760            .await
761            .expect("should receive exchange")
762            .expect("envelope should exist");
763
764        let exchange = envelope.exchange;
765
766        assert!(exchange.input.header("CamelTimerName").is_some());
767        assert!(exchange.input.header("CamelTimerCounter").is_some());
768        assert!(exchange.input.header("CamelTimerFiredTime").is_some());
769        assert!(exchange.input.header("CamelMessageTimestamp").is_some());
770    }
771
772    // TIMER-011: TimerEndpoint and TimerConsumer are pub
773    #[test]
774    fn test_timer_endpoint_is_pub() {
775        let component = TimerComponent::new();
776        let endpoint = component
777            .create_endpoint("timer:pub-test", &NoOpComponentContext)
778            .unwrap();
779        assert_eq!(endpoint.uri(), "timer:pub-test");
780    }
781}