Skip to main content

camel_component_timer/
lib.rs

1//! Timer component for rust-camel — fires `Exchange` events on configurable period, delay, and repeat-count schedules.
2//!
3//! Main types: `TimerComponent`, `TimerConsumer`, `TimerConfig`, `TimerEndpoint`.
4//! URI format: `timer:name?period=1000&delay=0&repeatCount=0`.
5//!
6//! # Features
7//!
8//! - **fixedRate**: When enabled, uses skip-missed-tick semantics instead of burst.
9//! - **includeMetadata**: Controls whether timer metadata headers are included in exchanges.
10//! - Double-start protection via `AtomicBool` guard.
11
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use async_trait::async_trait;
16use chrono::Utc;
17use tokio::time;
18use tracing::debug;
19
20use camel_component_api::UriConfig;
21use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
22use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
23
24// ---------------------------------------------------------------------------
25// TimerConfig
26// ---------------------------------------------------------------------------
27
28/// Configuration parsed from a timer URI.
29///
30/// Format: `timer:name?period=1000&delay=0&repeatCount=0&fixedRate=false&includeMetadata=true`
31#[derive(Debug, Clone, UriConfig)]
32#[uri_scheme = "timer"]
33#[uri_config(skip_impl, crate = "camel_component_api")]
34pub struct TimerConfig {
35    /// Timer name (the path portion of the URI).
36    pub name: String,
37
38    /// Interval between ticks (milliseconds). Default: 1000.
39    #[allow(dead_code)] // Used by macro-generated Duration conversion
40    #[uri_param(name = "period", default = "1000")]
41    period_ms: u64,
42
43    /// Converted Duration for period.
44    pub period: Duration,
45
46    /// Initial delay before the first tick (milliseconds). Default: 0.
47    #[allow(dead_code)] // Used by macro-generated Duration conversion
48    #[uri_param(name = "delay", default = "0")]
49    delay_ms: u64,
50
51    /// Converted Duration for delay.
52    pub delay: Duration,
53
54    /// Maximum number of ticks. `None` means infinite.
55    #[uri_param(name = "repeatCount")]
56    pub repeat_count: Option<u32>,
57
58    /// When true, use fixed-rate semantics (skip missed ticks).
59    /// When false (default), use burst semantics (fire all missed ticks immediately).
60    #[uri_param(name = "fixedRate", default = "false")]
61    pub fixed_rate: bool,
62
63    /// When true (default), include metadata headers (CamelTimerFiredTime,
64    /// CamelMessageTimestamp, CamelTimerName) in each exchange.
65    /// When false, send a minimal exchange without metadata headers.
66    #[uri_param(name = "includeMetadata", default = "true")]
67    pub include_metadata: bool,
68}
69
70// Inherent validate — callable as TimerConfig::validate(&self)
71impl TimerConfig {
72    /// Validate the configuration without consuming self.
73    pub fn validate(&self) -> Result<(), CamelError> {
74        if self.name.trim().is_empty() {
75            return Err(CamelError::InvalidUri(
76                "timer name must not be empty".to_string(),
77            ));
78        }
79        if self.period.is_zero() {
80            return Err(CamelError::InvalidUri(
81                "timer period must be greater than 0".to_string(),
82            ));
83        }
84        Ok(())
85    }
86}
87
88impl UriConfig for TimerConfig {
89    fn scheme() -> &'static str {
90        "timer"
91    }
92
93    fn from_uri(uri: &str) -> Result<Self, CamelError> {
94        let parts = camel_component_api::parse_uri(uri)?;
95        Self::from_components(parts)
96    }
97
98    fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
99        let config = Self::parse_uri_components(parts)?;
100        TimerConfig::validate(&config)?;
101        Ok(config)
102    }
103
104    fn validate(self) -> Result<Self, CamelError> {
105        // Delegate to the inherent validate(&self)
106        TimerConfig::validate(&self)?;
107        Ok(self)
108    }
109}
110
111// ---------------------------------------------------------------------------
112// TimerComponent
113// ---------------------------------------------------------------------------
114
115/// The Timer component produces exchanges on a periodic interval.
116pub struct TimerComponent;
117
118impl TimerComponent {
119    pub fn new() -> Self {
120        Self
121    }
122}
123
124impl Default for TimerComponent {
125    fn default() -> Self {
126        Self::new()
127    }
128}
129
130impl Component for TimerComponent {
131    fn scheme(&self) -> &str {
132        "timer"
133    }
134
135    fn create_endpoint(
136        &self,
137        uri: &str,
138        _ctx: &dyn camel_component_api::ComponentContext,
139    ) -> Result<Box<dyn Endpoint>, CamelError> {
140        let config = TimerConfig::from_uri(uri)?;
141        Ok(Box::new(TimerEndpoint {
142            uri: uri.to_string(),
143            config,
144        }))
145    }
146}
147
148// ---------------------------------------------------------------------------
149// TimerEndpoint
150// ---------------------------------------------------------------------------
151
152pub struct TimerEndpoint {
153    uri: String,
154    config: TimerConfig,
155}
156
157impl Endpoint for TimerEndpoint {
158    fn uri(&self) -> &str {
159        &self.uri
160    }
161
162    fn create_consumer(
163        &self,
164        _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
165    ) -> Result<Box<dyn Consumer>, CamelError> {
166        Ok(Box::new(TimerConsumer {
167            config: self.config.clone(),
168            started: AtomicBool::new(false),
169        }))
170    }
171
172    fn create_producer(
173        &self,
174        _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
175        _ctx: &ProducerContext,
176    ) -> Result<BoxProcessor, CamelError> {
177        Err(CamelError::EndpointCreationFailed(
178            "timer endpoint does not support producers".to_string(),
179        ))
180    }
181}
182
183// ---------------------------------------------------------------------------
184// TimerConsumer
185// ---------------------------------------------------------------------------
186
187pub struct TimerConsumer {
188    config: TimerConfig,
189    /// Guard against double-start (TIMER-003).
190    started: AtomicBool,
191}
192
193#[async_trait]
194impl Consumer for TimerConsumer {
195    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
196        // TIMER-003: Guard against double-start
197        self.started
198            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
199            .map_err(|_| {
200                CamelError::EndpointCreationFailed("timer consumer already started".to_string())
201            })?;
202
203        TimerConfig::validate(&self.config)?;
204        let config = self.config.clone();
205        let cancel_token = context.cancel_token();
206
207        // Initial delay (cancellable so shutdown isn't blocked by long delays)
208        if !config.delay.is_zero() {
209            tokio::select! {
210                _ = time::sleep(config.delay) => {}
211                _ = cancel_token.cancelled() => {
212                    debug!(timer = config.name, "Timer cancelled during initial delay");
213                    self.started.store(false, Ordering::SeqCst);
214                    return Ok(());
215                }
216            }
217        }
218
219        // If repeat_count is explicitly 0, fire zero times — stop immediately.
220        if config.repeat_count == Some(0) {
221            debug!(timer = config.name, "repeat_count=0, timer will not fire");
222            self.started.store(false, Ordering::SeqCst);
223            return Ok(());
224        }
225
226        let mut interval = time::interval(config.period);
227
228        // TIMER-002: fixedRate controls missed-tick behavior
229        if config.fixed_rate {
230            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
231        } else {
232            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
233        }
234
235        let mut count: u32 = 0;
236
237        loop {
238            tokio::select! {
239                _ = cancel_token.cancelled() => {
240                    debug!(timer = config.name, "Timer received cancellation, stopping");
241                    break;
242                }
243                _ = interval.tick() => {
244                    count += 1;
245
246                    debug!(timer = config.name, count, "Timer tick");
247
248                    let mut exchange = Exchange::new(Message::new(format!(
249                        "timer://{} tick #{}",
250                        config.name, count
251                    )));
252
253                    // TIMER-005 & TIMER-006: include metadata headers when enabled
254                    if config.include_metadata {
255                        exchange.input.set_header(
256                            "CamelTimerName",
257                            serde_json::Value::String(config.name.clone()),
258                        );
259                        exchange
260                            .input
261                            .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
262
263                        // TIMER-005: CamelTimerFiredTime (ISO-8601) and CamelMessageTimestamp (epoch millis)
264                        let now = Utc::now();
265                        exchange.input.set_header(
266                            "CamelTimerFiredTime",
267                            serde_json::Value::String(now.to_rfc3339()),
268                        );
269                        exchange.input.set_header(
270                            "CamelMessageTimestamp",
271                            serde_json::Value::Number(
272                                now.timestamp_millis().into(),
273                            ),
274                        );
275                    }
276
277                    if context.send(exchange).await.is_err() {
278                        // Channel closed, route was stopped
279                        break;
280                    }
281
282                    if let Some(max) = config.repeat_count
283                        && count >= max
284                    {
285                        break;
286                    }
287                }
288            }
289        }
290
291        // Reset started flag so consumer can be restarted after stop
292        self.started.store(false, Ordering::SeqCst);
293        Ok(())
294    }
295
296    async fn stop(&mut self) -> Result<(), CamelError> {
297        self.started.store(false, Ordering::SeqCst);
298        debug!(timer = self.config.name, "timer consumer stopped");
299        Ok(())
300    }
301}
302
303impl TimerConsumer {
304    /// Test helper: pre-set the started flag to simulate an already-running consumer.
305    #[cfg(test)]
306    pub(crate) fn mark_started_for_test(&self) {
307        self.started.store(true, Ordering::SeqCst);
308    }
309}
310
311// ---------------------------------------------------------------------------
312// Tests
313// ---------------------------------------------------------------------------
314
315#[cfg(test)]
316mod tests {
317    use camel_component_api::test_support::PanicRuntimeObservability;
318    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
319        std::sync::Arc::new(PanicRuntimeObservability)
320    }
321
322    use super::*;
323    use camel_component_api::NoOpComponentContext;
324
325    #[test]
326    fn test_zero_period_rejected() {
327        let result = TimerConfig::from_uri("timer:tick?period=0");
328        assert!(result.is_err(), "period=0 should be rejected");
329        let err_msg = result.unwrap_err().to_string();
330        assert!(err_msg.contains("period"), "error should mention 'period'");
331    }
332
333    #[test]
334    fn test_timer_empty_name_rejected() {
335        let result = TimerConfig::from_uri("timer:");
336        assert!(result.is_err());
337        let err = result.unwrap_err().to_string();
338        assert!(err.contains("must not be empty"), "unexpected error: {err}");
339    }
340
341    #[test]
342    fn test_timer_config_defaults() {
343        let config = TimerConfig::from_uri("timer:tick").unwrap();
344        assert_eq!(config.name, "tick");
345        assert_eq!(config.period, Duration::from_millis(1000));
346        assert_eq!(config.delay, Duration::from_millis(0));
347        assert_eq!(config.repeat_count, None);
348    }
349
350    #[test]
351    fn test_timer_config_with_params() {
352        let config =
353            TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
354        assert_eq!(config.name, "myTimer");
355        assert_eq!(config.period, Duration::from_millis(500));
356        assert_eq!(config.delay, Duration::from_millis(100));
357        assert_eq!(config.repeat_count, Some(5));
358    }
359
360    #[test]
361    fn test_timer_config_wrong_scheme() {
362        let result = TimerConfig::from_uri("log:info");
363        assert!(result.is_err());
364    }
365
366    #[test]
367    fn test_timer_component_scheme() {
368        let component = TimerComponent::new();
369        assert_eq!(component.scheme(), "timer");
370    }
371
372    #[test]
373    fn test_timer_component_creates_endpoint() {
374        let component = TimerComponent::new();
375        let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
376        assert!(endpoint.is_ok());
377    }
378
379    #[test]
380    fn test_timer_endpoint_no_producer() {
381        let ctx = ProducerContext::new();
382        let component = TimerComponent::new();
383        let endpoint = component
384            .create_endpoint("timer:tick", &NoOpComponentContext)
385            .unwrap();
386        let producer = endpoint.create_producer(rt(), &ctx);
387        assert!(producer.is_err());
388    }
389
390    #[test]
391    fn test_rejects_empty_timer_name() {
392        let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
393        cfg.name = "".into();
394        assert!(cfg.validate().is_err());
395    }
396
397    #[test]
398    fn test_rejects_zero_period() {
399        let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
400        cfg.period = Duration::ZERO;
401        assert!(cfg.validate().is_err());
402    }
403
404    #[test]
405    fn test_valid_config_passes() {
406        let mut cfg = TimerConfig::from_uri("timer:tick").unwrap();
407        cfg.name = "myTimer".into();
408        cfg.period = Duration::from_millis(1000);
409        assert!(cfg.validate().is_ok());
410    }
411
412    #[tokio::test]
413    async fn test_repeat_count_zero_fires_never() {
414        let component = TimerComponent::new();
415        let endpoint = component
416            .create_endpoint(
417                "timer:zero-test?period=50&repeatCount=0",
418                &NoOpComponentContext,
419            )
420            .unwrap();
421        let mut consumer = endpoint.create_consumer(rt()).unwrap();
422
423        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
424        let ctx = ConsumerContext::new(
425            tx,
426            tokio_util::sync::CancellationToken::new(),
427            "timer-test-route".to_string(),
428        );
429
430        // Start the consumer (spawns internally, returns immediately)
431        consumer.start(ctx).await.unwrap();
432
433        // Wait longer than the period — no messages should arrive
434        tokio::time::sleep(Duration::from_millis(200)).await;
435
436        // Drain any pending messages
437        let mut count = 0;
438        while rx.try_recv().is_ok() {
439            count += 1;
440        }
441        assert_eq!(
442            count, 0,
443            "repeat_count=0 should produce zero fires, got {count}"
444        );
445
446        // Clean up
447        consumer.stop().await.unwrap();
448    }
449
450    #[tokio::test]
451    async fn test_timer_consumer_fires() {
452        let component = TimerComponent::new();
453        let endpoint = component
454            .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
455            .unwrap();
456        let mut consumer = endpoint.create_consumer(rt()).unwrap();
457
458        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
459        let ctx = ConsumerContext::new(
460            tx,
461            tokio_util::sync::CancellationToken::new(),
462            "timer-test-route".to_string(),
463        );
464
465        // Run consumer in background
466        tokio::spawn(async move {
467            consumer.start(ctx).await.unwrap();
468        });
469
470        // Collect exchanges
471        let mut received = Vec::new();
472        while let Some(envelope) = rx.recv().await {
473            received.push(envelope.exchange);
474            if received.len() == 3 {
475                break;
476            }
477        }
478
479        assert_eq!(received.len(), 3);
480
481        // Verify headers on the first exchange
482        let first = &received[0];
483        assert_eq!(
484            first.input.header("CamelTimerName"),
485            Some(&serde_json::Value::String("test".into()))
486        );
487        assert_eq!(
488            first.input.header("CamelTimerCounter"),
489            Some(&serde_json::Value::Number(1.into()))
490        );
491    }
492
493    #[tokio::test]
494    async fn test_timer_consumer_respects_cancellation() {
495        use tokio_util::sync::CancellationToken;
496
497        let token = CancellationToken::new();
498        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
499        let ctx = ConsumerContext::new(tx, token.clone(), "timer-test-route".to_string());
500
501        let mut consumer = TimerConsumer {
502            config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
503            started: AtomicBool::new(false),
504        };
505
506        let handle = tokio::spawn(async move {
507            consumer.start(ctx).await.unwrap();
508        });
509
510        // Let it fire a few times
511        tokio::time::sleep(Duration::from_millis(180)).await;
512        token.cancel();
513
514        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
515        assert!(
516            result.is_ok(),
517            "Consumer should have stopped after cancellation"
518        );
519
520        let mut count = 0;
521        while rx.try_recv().is_ok() {
522            count += 1;
523        }
524        assert!(
525            count >= 2,
526            "Expected at least 2 exchanges before cancellation, got {count}"
527        );
528    }
529
530    #[tokio::test]
531    async fn test_timer_consumer_stop_shuts_down() {
532        let component = TimerComponent::new();
533        let endpoint = component
534            .create_endpoint("timer:stop-test?period=50", &NoOpComponentContext)
535            .unwrap();
536        let mut consumer = endpoint.create_consumer(rt()).unwrap();
537
538        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
539        let token = tokio_util::sync::CancellationToken::new();
540        let ctx = ConsumerContext::new(tx, token.clone(), "timer-test-route".to_string());
541
542        // Run consumer in background (start() blocks until cancelled)
543        tokio::spawn(async move {
544            consumer.start(ctx).await.unwrap();
545        });
546
547        // Let it fire a few times
548        tokio::time::sleep(Duration::from_millis(180)).await;
549
550        // Drain any pending exchanges
551        let mut count = 0;
552        while rx.try_recv().is_ok() {
553            count += 1;
554        }
555        assert!(count >= 2, "Expected at least 2 exchanges, got {count}");
556
557        // Cancel the token to stop the consumer
558        token.cancel();
559    }
560
561    // TIMER-002: fixedRate config round-trip
562    #[test]
563    fn test_fixed_rate_default_is_false() {
564        let config = TimerConfig::from_uri("timer:tick").unwrap();
565        assert!(!config.fixed_rate, "fixedRate should default to false");
566    }
567
568    #[test]
569    fn test_fixed_rate_parsed_from_uri() {
570        let config = TimerConfig::from_uri("timer:tick?fixedRate=true").unwrap();
571        assert!(
572            config.fixed_rate,
573            "fixedRate should be true when set in URI"
574        );
575    }
576
577    // TIMER-003: double-start guard
578    #[tokio::test]
579    async fn test_double_start_returns_error() {
580        let component = TimerComponent::new();
581        let endpoint = component
582            .create_endpoint(
583                "timer:double?period=50&repeatCount=2",
584                &NoOpComponentContext,
585            )
586            .unwrap(); // allow-unwrap: test setup
587
588        let mut consumer = TimerConsumer {
589            config: TimerConfig {
590                name: "double-test".to_string(),
591                period: Duration::from_millis(100),
592                period_ms: 100,
593                delay: Duration::ZERO,
594                delay_ms: 0,
595                repeat_count: None,
596                fixed_rate: false,
597                include_metadata: true,
598            },
599            started: AtomicBool::new(false),
600        };
601
602        // Simulate the consumer already being started by setting the flag.
603        consumer.mark_started_for_test();
604
605        let (tx, _rx) = tokio::sync::mpsc::channel(16);
606        let cancel_token = tokio_util::sync::CancellationToken::new();
607        let ctx = ConsumerContext::new(tx, cancel_token.clone(), "timer-test-route".to_string());
608
609        // Second start on an already-started consumer must return an error.
610        let result = consumer.start(ctx).await;
611        assert!(result.is_err(), "expected double-start to return Err");
612        let err_str = format!("{:?}", result.unwrap_err());
613        assert!(
614            err_str.contains("already started"),
615            "unexpected error: {err_str}"
616        );
617
618        drop(endpoint); // suppress unused-variable warning
619    }
620
621    // TIMER-005: CamelTimerFiredTime and CamelMessageTimestamp headers
622    #[tokio::test]
623    async fn test_timer_fired_time_and_message_timestamp_headers() {
624        let component = TimerComponent::new();
625        let endpoint = component
626            .create_endpoint(
627                "timer:headers?period=50&repeatCount=1",
628                &NoOpComponentContext,
629            )
630            .unwrap();
631        let mut consumer = endpoint.create_consumer(rt()).unwrap();
632
633        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
634        let ctx = ConsumerContext::new(
635            tx,
636            tokio_util::sync::CancellationToken::new(),
637            "timer-test-route".to_string(),
638        );
639
640        tokio::spawn(async move {
641            consumer.start(ctx).await.unwrap();
642        });
643
644        let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
645            .await
646            .expect("should receive exchange")
647            .expect("envelope should exist");
648
649        let exchange = envelope.exchange;
650
651        // CamelTimerFiredTime should be an ISO-8601 string
652        let fired_time = exchange
653            .input
654            .header("CamelTimerFiredTime")
655            .expect("CamelTimerFiredTime header should be present");
656        assert!(
657            fired_time.is_string(),
658            "CamelTimerFiredTime should be a string"
659        );
660        let fired_str = fired_time.as_str().unwrap();
661        // Should parse as ISO-8601 / RFC 3339
662        assert!(
663            chrono::DateTime::parse_from_rfc3339(fired_str).is_ok(),
664            "CamelTimerFiredTime should be valid RFC 3339: {fired_str}"
665        );
666
667        // CamelMessageTimestamp should be a number (epoch millis)
668        let msg_ts = exchange
669            .input
670            .header("CamelMessageTimestamp")
671            .expect("CamelMessageTimestamp header should be present");
672        assert!(
673            msg_ts.is_number(),
674            "CamelMessageTimestamp should be a number"
675        );
676        let ts_millis = msg_ts.as_i64().expect("should be i64");
677        assert!(ts_millis > 0, "timestamp should be positive");
678    }
679
680    #[test]
681    fn test_timer_fired_time_header_format() {
682        // Verify the format independently
683        let now = chrono::Utc::now();
684        let rfc = now.to_rfc3339();
685        assert!(chrono::DateTime::parse_from_rfc3339(&rfc).is_ok());
686        let millis = now.timestamp_millis();
687        assert!(millis > 0);
688    }
689
690    // TIMER-006: includeMetadata option
691    #[test]
692    fn test_include_metadata_default_is_true() {
693        let config = TimerConfig::from_uri("timer:tick").unwrap();
694        assert!(
695            config.include_metadata,
696            "includeMetadata should default to true"
697        );
698    }
699
700    #[test]
701    fn test_include_metadata_false_from_uri() {
702        let config = TimerConfig::from_uri("timer:tick?includeMetadata=false").unwrap();
703        assert!(
704            !config.include_metadata,
705            "includeMetadata should be false when set in URI"
706        );
707    }
708
709    #[tokio::test]
710    async fn test_include_metadata_false_omits_headers() {
711        let component = TimerComponent::new();
712        let endpoint = component
713            .create_endpoint(
714                "timer:minimal?period=50&repeatCount=1&includeMetadata=false",
715                &NoOpComponentContext,
716            )
717            .unwrap();
718        let mut consumer = endpoint.create_consumer(rt()).unwrap();
719
720        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
721        let ctx = ConsumerContext::new(
722            tx,
723            tokio_util::sync::CancellationToken::new(),
724            "timer-test-route".to_string(),
725        );
726
727        tokio::spawn(async move {
728            consumer.start(ctx).await.unwrap();
729        });
730
731        let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
732            .await
733            .expect("should receive exchange")
734            .expect("envelope should exist");
735
736        let exchange = envelope.exchange;
737
738        // No metadata headers should be present
739        assert!(
740            exchange.input.header("CamelTimerName").is_none(),
741            "CamelTimerName should not be present when includeMetadata=false"
742        );
743        assert!(
744            exchange.input.header("CamelTimerCounter").is_none(),
745            "CamelTimerCounter should not be present when includeMetadata=false"
746        );
747        assert!(
748            exchange.input.header("CamelTimerFiredTime").is_none(),
749            "CamelTimerFiredTime should not be present when includeMetadata=false"
750        );
751        assert!(
752            exchange.input.header("CamelMessageTimestamp").is_none(),
753            "CamelMessageTimestamp should not be present when includeMetadata=false"
754        );
755    }
756
757    #[tokio::test]
758    async fn test_include_metadata_true_includes_all_headers() {
759        let component = TimerComponent::new();
760        let endpoint = component
761            .create_endpoint(
762                "timer:full?period=50&repeatCount=1&includeMetadata=true",
763                &NoOpComponentContext,
764            )
765            .unwrap();
766        let mut consumer = endpoint.create_consumer(rt()).unwrap();
767
768        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
769        let ctx = ConsumerContext::new(
770            tx,
771            tokio_util::sync::CancellationToken::new(),
772            "timer-test-route".to_string(),
773        );
774
775        tokio::spawn(async move {
776            consumer.start(ctx).await.unwrap();
777        });
778
779        let envelope = tokio::time::timeout(Duration::from_secs(2), rx.recv())
780            .await
781            .expect("should receive exchange")
782            .expect("envelope should exist");
783
784        let exchange = envelope.exchange;
785
786        assert!(exchange.input.header("CamelTimerName").is_some());
787        assert!(exchange.input.header("CamelTimerCounter").is_some());
788        assert!(exchange.input.header("CamelTimerFiredTime").is_some());
789        assert!(exchange.input.header("CamelMessageTimestamp").is_some());
790    }
791
792    // TIMER-011: TimerEndpoint and TimerConsumer are pub
793    #[test]
794    fn test_timer_endpoint_is_pub() {
795        let component = TimerComponent::new();
796        let endpoint = component
797            .create_endpoint("timer:pub-test", &NoOpComponentContext)
798            .unwrap();
799        assert_eq!(endpoint.uri(), "timer:pub-test");
800    }
801}