Skip to main content

camel_component_timer/
lib.rs

1//! Timer component for rust-camel — fires `Exchange` events on configurable period, delay, and repeat-count schedules.
2//!
3//! Main types: `TimerComponent`, `TimerConsumer`, `TimerConfig`, `TimerEndpoint`.
4//! URI format: `timer:name?period=1000&delay=0&repeatCount=0`.
5//!
6//! # Features
7//!
8//! - **fixedRate**: When enabled, uses skip-missed-tick semantics instead of burst.
9//! - **includeMetadata**: Controls whether timer metadata headers are included in exchanges.
10//! - Double-start protection via `AtomicBool` guard.
11
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use async_trait::async_trait;
16use chrono::Utc;
17use tokio::time;
18use tracing::debug;
19
20use camel_component_api::UriConfig;
21use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
22use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
23
24// ---------------------------------------------------------------------------
25// TimerConfig
26// ---------------------------------------------------------------------------
27
28/// Configuration parsed from a timer URI.
29///
30/// Format: `timer:name?period=1000&delay=0&repeatCount=0&fixedRate=false&includeMetadata=true`
31#[derive(Debug, Clone, UriConfig)]
32#[uri_scheme = "timer"]
33#[uri_config(skip_impl, crate = "camel_component_api")]
34pub struct TimerConfig {
35    /// Timer name (the path portion of the URI).
36    pub name: String,
37
38    /// Interval between ticks (milliseconds). Default: 1000.
39    #[allow(dead_code)] // Used by macro-generated Duration conversion
40    #[uri_param(name = "period", default = "1000")]
41    period_ms: u64,
42
43    /// Converted Duration for period.
44    pub period: Duration,
45
46    /// Initial delay before the first tick (milliseconds). Default: 0.
47    #[allow(dead_code)] // Used by macro-generated Duration conversion
48    #[uri_param(name = "delay", default = "0")]
49    delay_ms: u64,
50
51    /// Converted Duration for delay.
52    pub delay: Duration,
53
54    /// Maximum number of ticks. `None` means infinite.
55    #[uri_param(name = "repeatCount")]
56    pub repeat_count: Option<u32>,
57
58    /// When true, use fixed-rate semantics (skip missed ticks).
59    /// When false (default), use burst semantics (fire all missed ticks immediately).
60    #[uri_param(name = "fixedRate", default = "false")]
61    pub fixed_rate: bool,
62
63    /// When true (default), include metadata headers (CamelTimerFiredTime,
64    /// CamelMessageTimestamp, CamelTimerName) in each exchange.
65    /// When false, send a minimal exchange without metadata headers.
66    #[uri_param(name = "includeMetadata", default = "true")]
67    pub include_metadata: bool,
68}
69
70// Inherent validate — callable as TimerConfig::validate(&self)
71impl TimerConfig {
72    /// Validate the configuration without consuming self.
73    pub fn validate(&self) -> Result<(), CamelError> {
74        if self.name.trim().is_empty() {
75            return Err(CamelError::InvalidUri(
76                "timer name must not be empty".to_string(),
77            ));
78        }
79        if self.period.is_zero() {
80            return Err(CamelError::InvalidUri(
81                "timer period must be greater than 0".to_string(),
82            ));
83        }
84        Ok(())
85    }
86}
87
88impl UriConfig for TimerConfig {
89    fn scheme() -> &'static str {
90        "timer"
91    }
92
93    fn from_uri(uri: &str) -> Result<Self, CamelError> {
94        let parts = camel_component_api::parse_uri(uri)?;
95        Self::from_components(parts)
96    }
97
98    fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
99        let config = Self::parse_uri_components(parts)?;
100        TimerConfig::validate(&config)?;
101        Ok(config)
102    }
103
104    fn validate(self) -> Result<Self, CamelError> {
105        // Delegate to the inherent validate(&self)
106        TimerConfig::validate(&self)?;
107        Ok(self)
108    }
109}
110
111// ---------------------------------------------------------------------------
112// TimerComponent
113// ---------------------------------------------------------------------------
114
115/// The Timer component produces exchanges on a periodic interval.
116pub struct TimerComponent;
117
118impl TimerComponent {
119    pub fn new() -> Self {
120        Self
121    }
122}
123
124impl Default for TimerComponent {
125    fn default() -> Self {
126        Self::new()
127    }
128}
129
130impl Component for TimerComponent {
131    fn scheme(&self) -> &str {
132        "timer"
133    }
134
135    fn create_endpoint(
136        &self,
137        uri: &str,
138        _ctx: &dyn camel_component_api::ComponentContext,
139    ) -> Result<Box<dyn Endpoint>, CamelError> {
140        let config = TimerConfig::from_uri(uri)?;
141        Ok(Box::new(TimerEndpoint {
142            uri: uri.to_string(),
143            config,
144        }))
145    }
146}
147
148// ---------------------------------------------------------------------------
149// TimerEndpoint
150// ---------------------------------------------------------------------------
151
152pub struct TimerEndpoint {
153    uri: String,
154    config: TimerConfig,
155}
156
157impl Endpoint for TimerEndpoint {
158    fn uri(&self) -> &str {
159        &self.uri
160    }
161
162    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
163        Ok(Box::new(TimerConsumer {
164            config: self.config.clone(),
165            started: AtomicBool::new(false),
166        }))
167    }
168
169    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
170        Err(CamelError::EndpointCreationFailed(
171            "timer endpoint does not support producers".to_string(),
172        ))
173    }
174}
175
176// ---------------------------------------------------------------------------
177// TimerConsumer
178// ---------------------------------------------------------------------------
179
180pub struct TimerConsumer {
181    config: TimerConfig,
182    /// Guard against double-start (TIMER-003).
183    started: AtomicBool,
184}
185
186#[async_trait]
187impl Consumer for TimerConsumer {
188    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
189        // TIMER-003: Guard against double-start
190        self.started
191            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
192            .map_err(|_| {
193                CamelError::EndpointCreationFailed("timer consumer already started".to_string())
194            })?;
195
196        TimerConfig::validate(&self.config)?;
197        let config = self.config.clone();
198        let cancel_token = context.cancel_token();
199
200        // Initial delay (cancellable so shutdown isn't blocked by long delays)
201        if !config.delay.is_zero() {
202            tokio::select! {
203                _ = time::sleep(config.delay) => {}
204                _ = cancel_token.cancelled() => {
205                    debug!(timer = config.name, "Timer cancelled during initial delay");
206                    self.started.store(false, Ordering::SeqCst);
207                    return Ok(());
208                }
209            }
210        }
211
212        // If repeat_count is explicitly 0, fire zero times — stop immediately.
213        if config.repeat_count == Some(0) {
214            debug!(timer = config.name, "repeat_count=0, timer will not fire");
215            self.started.store(false, Ordering::SeqCst);
216            return Ok(());
217        }
218
219        let mut interval = time::interval(config.period);
220
221        // TIMER-002: fixedRate controls missed-tick behavior
222        if config.fixed_rate {
223            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
224        } else {
225            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
226        }
227
228        let mut count: u32 = 0;
229
230        loop {
231            tokio::select! {
232                _ = cancel_token.cancelled() => {
233                    debug!(timer = config.name, "Timer received cancellation, stopping");
234                    break;
235                }
236                _ = interval.tick() => {
237                    count += 1;
238
239                    debug!(timer = config.name, count, "Timer tick");
240
241                    let mut exchange = Exchange::new(Message::new(format!(
242                        "timer://{} tick #{}",
243                        config.name, count
244                    )));
245
246                    // TIMER-005 & TIMER-006: include metadata headers when enabled
247                    if config.include_metadata {
248                        exchange.input.set_header(
249                            "CamelTimerName",
250                            serde_json::Value::String(config.name.clone()),
251                        );
252                        exchange
253                            .input
254                            .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
255
256                        // TIMER-005: CamelTimerFiredTime (ISO-8601) and CamelMessageTimestamp (epoch millis)
257                        let now = Utc::now();
258                        exchange.input.set_header(
259                            "CamelTimerFiredTime",
260                            serde_json::Value::String(now.to_rfc3339()),
261                        );
262                        exchange.input.set_header(
263                            "CamelMessageTimestamp",
264                            serde_json::Value::Number(
265                                now.timestamp_millis().into(),
266                            ),
267                        );
268                    }
269
270                    if context.send(exchange).await.is_err() {
271                        // Channel closed, route was stopped
272                        break;
273                    }
274
275                    if let Some(max) = config.repeat_count
276                        && count >= max
277                    {
278                        break;
279                    }
280                }
281            }
282        }
283
284        // Reset started flag so consumer can be restarted after stop
285        self.started.store(false, Ordering::SeqCst);
286        Ok(())
287    }
288
289    async fn stop(&mut self) -> Result<(), CamelError> {
290        self.started.store(false, Ordering::SeqCst);
291        debug!(timer = self.config.name, "timer consumer stopped");
292        Ok(())
293    }
294}
295
296impl TimerConsumer {
297    /// Test helper: pre-set the started flag to simulate an already-running consumer.
298    #[cfg(test)]
299    pub(crate) fn mark_started_for_test(&self) {
300        self.started.store(true, Ordering::SeqCst);
301    }
302}
303
304// ---------------------------------------------------------------------------
305// Tests
306// ---------------------------------------------------------------------------
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use camel_component_api::NoOpComponentContext;
312
313    #[test]
314    fn test_zero_period_rejected() {
315        let result = TimerConfig::from_uri("timer:tick?period=0");
316        assert!(result.is_err(), "period=0 should be rejected");
317        let err_msg = result.unwrap_err().to_string();
318        assert!(err_msg.contains("period"), "error should mention 'period'");
319    }
320
321    #[test]
322    fn test_timer_empty_name_rejected() {
323        let result = TimerConfig::from_uri("timer:");
324        assert!(result.is_err());
325        let err = result.unwrap_err().to_string();
326        assert!(err.contains("must not be empty"), "unexpected error: {err}");
327    }
328
329    #[test]
330    fn test_timer_config_defaults() {
331        let config = TimerConfig::from_uri("timer:tick").unwrap();
332        assert_eq!(config.name, "tick");
333        assert_eq!(config.period, Duration::from_millis(1000));
334        assert_eq!(config.delay, Duration::from_millis(0));
335        assert_eq!(config.repeat_count, None);
336    }
337
338    #[test]
339    fn test_timer_config_with_params() {
340        let config =
341            TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
342        assert_eq!(config.name, "myTimer");
343        assert_eq!(config.period, Duration::from_millis(500));
344        assert_eq!(config.delay, Duration::from_millis(100));
345        assert_eq!(config.repeat_count, Some(5));
346    }
347
348    #[test]
349    fn test_timer_config_wrong_scheme() {
350        let result = TimerConfig::from_uri("log:info");
351        assert!(result.is_err());
352    }
353
354    #[test]
355    fn test_timer_component_scheme() {
356        let component = TimerComponent::new();
357        assert_eq!(component.scheme(), "timer");
358    }
359
360    #[test]
361    fn test_timer_component_creates_endpoint() {
362        let component = TimerComponent::new();
363        let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
364        assert!(endpoint.is_ok());
365    }
366
367    #[test]
368    fn test_timer_endpoint_no_producer() {
369        let ctx = ProducerContext::new();
370        let component = TimerComponent::new();
371        let endpoint = component
372            .create_endpoint("timer:tick", &NoOpComponentContext)
373            .unwrap();
374        let producer = endpoint.create_producer(&ctx);
375        assert!(producer.is_err());
376    }
377
378    #[test]
379    fn test_rejects_empty_timer_name() {
380        let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
381        cfg.name = "".into();
382        assert!(cfg.validate().is_err());
383    }
384
385    #[test]
386    fn test_rejects_zero_period() {
387        let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
388        cfg.period = Duration::ZERO;
389        assert!(cfg.validate().is_err());
390    }
391
392    #[test]
393    fn test_valid_config_passes() {
394        let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
395        cfg.name = "myTimer".into();
396        cfg.period = Duration::from_millis(1000);
397        assert!(cfg.validate().is_ok());
398    }
399
400    #[tokio::test]
401    async fn test_repeat_count_zero_fires_never() {
402        let component = TimerComponent::new();
403        let endpoint = component
404            .create_endpoint(
405                "timer:zero-test?period=50&repeatCount=0",
406                &NoOpComponentContext,
407            )
408            .unwrap();
409        let mut consumer = endpoint.create_consumer().unwrap();
410
411        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
412        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
413
414        // Start the consumer (spawns internally, returns immediately)
415        consumer.start(ctx).await.unwrap();
416
417        // Wait longer than the period — no messages should arrive
418        tokio::time::sleep(Duration::from_millis(200)).await;
419
420        // Drain any pending messages
421        let mut count = 0;
422        while rx.try_recv().is_ok() {
423            count += 1;
424        }
425        assert_eq!(
426            count, 0,
427            "repeat_count=0 should produce zero fires, got {count}"
428        );
429
430        // Clean up
431        consumer.stop().await.unwrap();
432    }
433
434    #[tokio::test]
435    async fn test_timer_consumer_fires() {
436        let component = TimerComponent::new();
437        let endpoint = component
438            .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
439            .unwrap();
440        let mut consumer = endpoint.create_consumer().unwrap();
441
442        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
443        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
444
445        // Run consumer in background
446        tokio::spawn(async move {
447            consumer.start(ctx).await.unwrap();
448        });
449
450        // Collect exchanges
451        let mut received = Vec::new();
452        while let Some(envelope) = rx.recv().await {
453            received.push(envelope.exchange);
454            if received.len() == 3 {
455                break;
456            }
457        }
458
459        assert_eq!(received.len(), 3);
460
461        // Verify headers on the first exchange
462        let first = &received[0];
463        assert_eq!(
464            first.input.header("CamelTimerName"),
465            Some(&serde_json::Value::String("test".into()))
466        );
467        assert_eq!(
468            first.input.header("CamelTimerCounter"),
469            Some(&serde_json::Value::Number(1.into()))
470        );
471    }
472
473    #[tokio::test]
474    async fn test_timer_consumer_respects_cancellation() {
475        use tokio_util::sync::CancellationToken;
476
477        let token = CancellationToken::new();
478        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
479        let ctx = ConsumerContext::new(tx, token.clone());
480
481        let mut consumer = TimerConsumer {
482            config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
483            started: AtomicBool::new(false),
484        };
485
486        let handle = tokio::spawn(async move {
487            consumer.start(ctx).await.unwrap();
488        });
489
490        // Let it fire a few times
491        tokio::time::sleep(Duration::from_millis(180)).await;
492        token.cancel();
493
494        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
495        assert!(
496            result.is_ok(),
497            "Consumer should have stopped after cancellation"
498        );
499
500        let mut count = 0;
501        while rx.try_recv().is_ok() {
502            count += 1;
503        }
504        assert!(
505            count >= 2,
506            "Expected at least 2 exchanges before cancellation, got {count}"
507        );
508    }
509
510    #[tokio::test]
511    async fn test_timer_consumer_stop_shuts_down() {
512        let component = TimerComponent::new();
513        let endpoint = component
514            .create_endpoint("timer:stop-test?period=50", &NoOpComponentContext)
515            .unwrap();
516        let mut consumer = endpoint.create_consumer().unwrap();
517
518        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
519        let token = tokio_util::sync::CancellationToken::new();
520        let ctx = ConsumerContext::new(tx, token.clone());
521
522        // Run consumer in background (start() blocks until cancelled)
523        tokio::spawn(async move {
524            consumer.start(ctx).await.unwrap();
525        });
526
527        // Let it fire a few times
528        tokio::time::sleep(Duration::from_millis(180)).await;
529
530        // Drain any pending exchanges
531        let mut count = 0;
532        while rx.try_recv().is_ok() {
533            count += 1;
534        }
535        assert!(count >= 2, "Expected at least 2 exchanges, got {count}");
536
537        // Cancel the token to stop the consumer
538        token.cancel();
539    }
540
541    // TIMER-002: fixedRate config round-trip
542    #[test]
543    fn test_fixed_rate_default_is_false() {
544        let config = TimerConfig::from_uri("timer:tick").unwrap();
545        assert!(!config.fixed_rate, "fixedRate should default to false");
546    }
547
548    #[test]
549    fn test_fixed_rate_parsed_from_uri() {
550        let config = TimerConfig::from_uri("timer:tick?fixedRate=true").unwrap();
551        assert!(
552            config.fixed_rate,
553            "fixedRate should be true when set in URI"
554        );
555    }
556
557    // TIMER-003: double-start guard
558    #[tokio::test]
559    async fn test_double_start_returns_error() {
560        let component = TimerComponent::new();
561        let endpoint = component
562            .create_endpoint(
563                "timer:double?period=50&repeatCount=2",
564                &NoOpComponentContext,
565            )
566            .unwrap(); // allow-unwrap: test setup
567
568        let mut consumer = TimerConsumer {
569            config: TimerConfig {
570                name: "double-test".to_string(),
571                period: Duration::from_millis(100),
572                period_ms: 100,
573                delay: Duration::ZERO,
574                delay_ms: 0,
575                repeat_count: None,
576                fixed_rate: false,
577                include_metadata: true,
578            },
579            started: AtomicBool::new(false),
580        };
581
582        // Simulate the consumer already being started by setting the flag.
583        consumer.mark_started_for_test();
584
585        let (tx, _rx) = tokio::sync::mpsc::channel(16);
586        let cancel_token = tokio_util::sync::CancellationToken::new();
587        let ctx = ConsumerContext::new(tx, cancel_token.clone());
588
589        // Second start on an already-started consumer must return an error.
590        let result = consumer.start(ctx).await;
591        assert!(result.is_err(), "expected double-start to return Err");
592        let err_str = format!("{:?}", result.unwrap_err());
593        assert!(
594            err_str.contains("already started"),
595            "unexpected error: {err_str}"
596        );
597
598        drop(endpoint); // suppress unused-variable warning
599    }
600
601    // TIMER-005: CamelTimerFiredTime and CamelMessageTimestamp headers
602    #[tokio::test]
603    async fn test_timer_fired_time_and_message_timestamp_headers() {
604        let component = TimerComponent::new();
605        let endpoint = component
606            .create_endpoint(
607                "timer:headers?period=50&repeatCount=1",
608                &NoOpComponentContext,
609            )
610            .unwrap();
611        let mut consumer = endpoint.create_consumer().unwrap();
612
613        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
614        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
615
616        tokio::spawn(async move {
617            consumer.start(ctx).await.unwrap();
618        });
619
620        let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
621            .await
622            .expect("should receive exchange")
623            .expect("envelope should exist");
624
625        let exchange = envelope.exchange;
626
627        // CamelTimerFiredTime should be an ISO-8601 string
628        let fired_time = exchange
629            .input
630            .header("CamelTimerFiredTime")
631            .expect("CamelTimerFiredTime header should be present");
632        assert!(
633            fired_time.is_string(),
634            "CamelTimerFiredTime should be a string"
635        );
636        let fired_str = fired_time.as_str().unwrap();
637        // Should parse as ISO-8601 / RFC 3339
638        assert!(
639            chrono::DateTime::parse_from_rfc3339(fired_str).is_ok(),
640            "CamelTimerFiredTime should be valid RFC 3339: {fired_str}"
641        );
642
643        // CamelMessageTimestamp should be a number (epoch millis)
644        let msg_ts = exchange
645            .input
646            .header("CamelMessageTimestamp")
647            .expect("CamelMessageTimestamp header should be present");
648        assert!(
649            msg_ts.is_number(),
650            "CamelMessageTimestamp should be a number"
651        );
652        let ts_millis = msg_ts.as_i64().expect("should be i64");
653        assert!(ts_millis > 0, "timestamp should be positive");
654    }
655
656    #[test]
657    fn test_timer_fired_time_header_format() {
658        // Verify the format independently
659        let now = chrono::Utc::now();
660        let rfc = now.to_rfc3339();
661        assert!(chrono::DateTime::parse_from_rfc3339(&rfc).is_ok());
662        let millis = now.timestamp_millis();
663        assert!(millis > 0);
664    }
665
666    // TIMER-006: includeMetadata option
667    #[test]
668    fn test_include_metadata_default_is_true() {
669        let config = TimerConfig::from_uri("timer:tick").unwrap();
670        assert!(
671            config.include_metadata,
672            "includeMetadata should default to true"
673        );
674    }
675
676    #[test]
677    fn test_include_metadata_false_from_uri() {
678        let config = TimerConfig::from_uri("timer:tick?includeMetadata=false").unwrap();
679        assert!(
680            !config.include_metadata,
681            "includeMetadata should be false when set in URI"
682        );
683    }
684
685    #[tokio::test]
686    async fn test_include_metadata_false_omits_headers() {
687        let component = TimerComponent::new();
688        let endpoint = component
689            .create_endpoint(
690                "timer:minimal?period=50&repeatCount=1&includeMetadata=false",
691                &NoOpComponentContext,
692            )
693            .unwrap();
694        let mut consumer = endpoint.create_consumer().unwrap();
695
696        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
697        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
698
699        tokio::spawn(async move {
700            consumer.start(ctx).await.unwrap();
701        });
702
703        let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
704            .await
705            .expect("should receive exchange")
706            .expect("envelope should exist");
707
708        let exchange = envelope.exchange;
709
710        // No metadata headers should be present
711        assert!(
712            exchange.input.header("CamelTimerName").is_none(),
713            "CamelTimerName should not be present when includeMetadata=false"
714        );
715        assert!(
716            exchange.input.header("CamelTimerCounter").is_none(),
717            "CamelTimerCounter should not be present when includeMetadata=false"
718        );
719        assert!(
720            exchange.input.header("CamelTimerFiredTime").is_none(),
721            "CamelTimerFiredTime should not be present when includeMetadata=false"
722        );
723        assert!(
724            exchange.input.header("CamelMessageTimestamp").is_none(),
725            "CamelMessageTimestamp should not be present when includeMetadata=false"
726        );
727    }
728
729    #[tokio::test]
730    async fn test_include_metadata_true_includes_all_headers() {
731        let component = TimerComponent::new();
732        let endpoint = component
733            .create_endpoint(
734                "timer:full?period=50&repeatCount=1&includeMetadata=true",
735                &NoOpComponentContext,
736            )
737            .unwrap();
738        let mut consumer = endpoint.create_consumer().unwrap();
739
740        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
741        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
742
743        tokio::spawn(async move {
744            consumer.start(ctx).await.unwrap();
745        });
746
747        let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
748            .await
749            .expect("should receive exchange")
750            .expect("envelope should exist");
751
752        let exchange = envelope.exchange;
753
754        assert!(exchange.input.header("CamelTimerName").is_some());
755        assert!(exchange.input.header("CamelTimerCounter").is_some());
756        assert!(exchange.input.header("CamelTimerFiredTime").is_some());
757        assert!(exchange.input.header("CamelMessageTimestamp").is_some());
758    }
759
760    // TIMER-011: TimerEndpoint and TimerConsumer are pub
761    #[test]
762    fn test_timer_endpoint_is_pub() {
763        let component = TimerComponent::new();
764        let endpoint = component
765            .create_endpoint("timer:pub-test", &NoOpComponentContext)
766            .unwrap();
767        assert_eq!(endpoint.uri(), "timer:pub-test");
768    }
769}