Skip to main content

camel_component_timer/
lib.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::time;
5use tracing::debug;
6
7use camel_component_api::UriConfig;
8use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
9use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
10
11// ---------------------------------------------------------------------------
12// TimerConfig
13// ---------------------------------------------------------------------------
14
15/// Configuration parsed from a timer URI.
16///
17/// Format: `timer:name?period=1000&delay=0&repeatCount=0`
18#[derive(Debug, Clone, UriConfig)]
19#[uri_scheme = "timer"]
20#[uri_config(crate = "camel_component_api")]
21pub struct TimerConfig {
22    /// Timer name (the path portion of the URI).
23    pub name: String,
24
25    /// Interval between ticks (milliseconds). Default: 1000.
26    #[allow(dead_code)] // Used by macro-generated Duration conversion
27    #[uri_param(name = "period", default = "1000")]
28    period_ms: u64,
29
30    /// Converted Duration for period.
31    pub period: Duration,
32
33    /// Initial delay before the first tick (milliseconds). Default: 0.
34    #[allow(dead_code)] // Used by macro-generated Duration conversion
35    #[uri_param(name = "delay", default = "0")]
36    delay_ms: u64,
37
38    /// Converted Duration for delay.
39    pub delay: Duration,
40
41    /// Maximum number of ticks. `None` means infinite.
42    #[uri_param(name = "repeatCount")]
43    pub repeat_count: Option<u32>,
44}
45
46// ---------------------------------------------------------------------------
47// TimerComponent
48// ---------------------------------------------------------------------------
49
50/// The Timer component produces exchanges on a periodic interval.
51pub struct TimerComponent;
52
53impl TimerComponent {
54    pub fn new() -> Self {
55        Self
56    }
57}
58
59impl Default for TimerComponent {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl Component for TimerComponent {
66    fn scheme(&self) -> &str {
67        "timer"
68    }
69
70    fn create_endpoint(
71        &self,
72        uri: &str,
73        _ctx: &dyn camel_component_api::ComponentContext,
74    ) -> Result<Box<dyn Endpoint>, CamelError> {
75        let config = TimerConfig::from_uri(uri)?;
76        Ok(Box::new(TimerEndpoint {
77            uri: uri.to_string(),
78            config,
79        }))
80    }
81}
82
83// ---------------------------------------------------------------------------
84// TimerEndpoint
85// ---------------------------------------------------------------------------
86
87struct TimerEndpoint {
88    uri: String,
89    config: TimerConfig,
90}
91
92impl Endpoint for TimerEndpoint {
93    fn uri(&self) -> &str {
94        &self.uri
95    }
96
97    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
98        Ok(Box::new(TimerConsumer {
99            config: self.config.clone(),
100        }))
101    }
102
103    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
104        Err(CamelError::EndpointCreationFailed(
105            "timer endpoint does not support producers".to_string(),
106        ))
107    }
108}
109
110// ---------------------------------------------------------------------------
111// TimerConsumer
112// ---------------------------------------------------------------------------
113
114struct TimerConsumer {
115    config: TimerConfig,
116}
117
118#[async_trait]
119impl Consumer for TimerConsumer {
120    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
121        let config = self.config.clone();
122
123        // Initial delay (cancellable so shutdown isn't blocked by long delays)
124        if !config.delay.is_zero() {
125            tokio::select! {
126                _ = time::sleep(config.delay) => {}
127                _ = context.cancelled() => {
128                    debug!(timer = config.name, "Timer cancelled during initial delay");
129                    return Ok(());
130                }
131            }
132        }
133
134        let mut interval = time::interval(config.period);
135        let mut count: u32 = 0;
136
137        loop {
138            tokio::select! {
139                _ = context.cancelled() => {
140                    debug!(timer = config.name, "Timer received cancellation, stopping");
141                    break;
142                }
143                _ = interval.tick() => {
144                    count += 1;
145
146                    debug!(timer = config.name, count, "Timer tick");
147
148                    let mut exchange = Exchange::new(Message::new(format!(
149                        "timer://{} tick #{}",
150                        config.name, count
151                    )));
152                    exchange.input.set_header(
153                        "CamelTimerName",
154                        serde_json::Value::String(config.name.clone()),
155                    );
156                    exchange
157                        .input
158                        .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
159
160                    if context.send(exchange).await.is_err() {
161                        // Channel closed, route was stopped
162                        break;
163                    }
164
165                    if let Some(max) = config.repeat_count
166                        && count >= max
167                    {
168                        break;
169                    }
170                }
171            }
172        }
173
174        Ok(())
175    }
176
177    async fn stop(&mut self) -> Result<(), CamelError> {
178        Ok(())
179    }
180}
181
182// ---------------------------------------------------------------------------
183// Tests
184// ---------------------------------------------------------------------------
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use camel_component_api::NoOpComponentContext;
190
191    #[test]
192    fn test_timer_config_defaults() {
193        let config = TimerConfig::from_uri("timer:tick").unwrap();
194        assert_eq!(config.name, "tick");
195        assert_eq!(config.period, Duration::from_millis(1000));
196        assert_eq!(config.delay, Duration::from_millis(0));
197        assert_eq!(config.repeat_count, None);
198    }
199
200    #[test]
201    fn test_timer_config_with_params() {
202        let config =
203            TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
204        assert_eq!(config.name, "myTimer");
205        assert_eq!(config.period, Duration::from_millis(500));
206        assert_eq!(config.delay, Duration::from_millis(100));
207        assert_eq!(config.repeat_count, Some(5));
208    }
209
210    #[test]
211    fn test_timer_config_wrong_scheme() {
212        let result = TimerConfig::from_uri("log:info");
213        assert!(result.is_err());
214    }
215
216    #[test]
217    fn test_timer_component_scheme() {
218        let component = TimerComponent::new();
219        assert_eq!(component.scheme(), "timer");
220    }
221
222    #[test]
223    fn test_timer_component_creates_endpoint() {
224        let component = TimerComponent::new();
225        let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
226        assert!(endpoint.is_ok());
227    }
228
229    #[test]
230    fn test_timer_endpoint_no_producer() {
231        let ctx = ProducerContext::new();
232        let component = TimerComponent::new();
233        let endpoint = component
234            .create_endpoint("timer:tick", &NoOpComponentContext)
235            .unwrap();
236        let producer = endpoint.create_producer(&ctx);
237        assert!(producer.is_err());
238    }
239
240    #[tokio::test]
241    async fn test_timer_consumer_fires() {
242        let component = TimerComponent::new();
243        let endpoint = component
244            .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
245            .unwrap();
246        let mut consumer = endpoint.create_consumer().unwrap();
247
248        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
249        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
250
251        // Run consumer in background
252        tokio::spawn(async move {
253            consumer.start(ctx).await.unwrap();
254        });
255
256        // Collect exchanges
257        let mut received = Vec::new();
258        while let Some(envelope) = rx.recv().await {
259            received.push(envelope.exchange);
260            if received.len() == 3 {
261                break;
262            }
263        }
264
265        assert_eq!(received.len(), 3);
266
267        // Verify headers on the first exchange
268        let first = &received[0];
269        assert_eq!(
270            first.input.header("CamelTimerName"),
271            Some(&serde_json::Value::String("test".into()))
272        );
273        assert_eq!(
274            first.input.header("CamelTimerCounter"),
275            Some(&serde_json::Value::Number(1.into()))
276        );
277    }
278
279    #[tokio::test]
280    async fn test_timer_consumer_respects_cancellation() {
281        use tokio_util::sync::CancellationToken;
282
283        let token = CancellationToken::new();
284        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
285        let ctx = ConsumerContext::new(tx, token.clone());
286
287        let mut consumer = TimerConsumer {
288            config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
289        };
290
291        let handle = tokio::spawn(async move {
292            consumer.start(ctx).await.unwrap();
293        });
294
295        // Let it fire a few times
296        tokio::time::sleep(Duration::from_millis(180)).await;
297        token.cancel();
298
299        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
300        assert!(
301            result.is_ok(),
302            "Consumer should have stopped after cancellation"
303        );
304
305        let mut count = 0;
306        while rx.try_recv().is_ok() {
307            count += 1;
308        }
309        assert!(
310            count >= 2,
311            "Expected at least 2 exchanges before cancellation, got {count}"
312        );
313    }
314}