Skip to main content

camel_component_timer/
lib.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::time;
5use tracing::debug;
6
7use camel_component_api::UriConfig;
8use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
9use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
10
11// ---------------------------------------------------------------------------
12// TimerConfig
13// ---------------------------------------------------------------------------
14
15/// Configuration parsed from a timer URI.
16///
17/// Format: `timer:name?period=1000&delay=0&repeatCount=0`
18#[derive(Debug, Clone, UriConfig)]
19#[uri_scheme = "timer"]
20#[uri_config(crate = "camel_component_api")]
21pub struct TimerConfig {
22    /// Timer name (the path portion of the URI).
23    pub name: String,
24
25    /// Interval between ticks (milliseconds). Default: 1000.
26    #[allow(dead_code)] // Used by macro-generated Duration conversion
27    #[uri_param(name = "period", default = "1000")]
28    period_ms: u64,
29
30    /// Converted Duration for period.
31    pub period: Duration,
32
33    /// Initial delay before the first tick (milliseconds). Default: 0.
34    #[allow(dead_code)] // Used by macro-generated Duration conversion
35    #[uri_param(name = "delay", default = "0")]
36    delay_ms: u64,
37
38    /// Converted Duration for delay.
39    pub delay: Duration,
40
41    /// Maximum number of ticks. `None` means infinite.
42    #[uri_param(name = "repeatCount")]
43    pub repeat_count: Option<u32>,
44}
45
46// ---------------------------------------------------------------------------
47// TimerComponent
48// ---------------------------------------------------------------------------
49
50/// The Timer component produces exchanges on a periodic interval.
51pub struct TimerComponent;
52
53impl TimerComponent {
54    pub fn new() -> Self {
55        Self
56    }
57}
58
59impl Default for TimerComponent {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl Component for TimerComponent {
66    fn scheme(&self) -> &str {
67        "timer"
68    }
69
70    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
71        let config = TimerConfig::from_uri(uri)?;
72        Ok(Box::new(TimerEndpoint {
73            uri: uri.to_string(),
74            config,
75        }))
76    }
77}
78
79// ---------------------------------------------------------------------------
80// TimerEndpoint
81// ---------------------------------------------------------------------------
82
83struct TimerEndpoint {
84    uri: String,
85    config: TimerConfig,
86}
87
88impl Endpoint for TimerEndpoint {
89    fn uri(&self) -> &str {
90        &self.uri
91    }
92
93    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
94        Ok(Box::new(TimerConsumer {
95            config: self.config.clone(),
96        }))
97    }
98
99    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
100        Err(CamelError::EndpointCreationFailed(
101            "timer endpoint does not support producers".to_string(),
102        ))
103    }
104}
105
106// ---------------------------------------------------------------------------
107// TimerConsumer
108// ---------------------------------------------------------------------------
109
110struct TimerConsumer {
111    config: TimerConfig,
112}
113
114#[async_trait]
115impl Consumer for TimerConsumer {
116    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
117        let config = self.config.clone();
118
119        // Initial delay (cancellable so shutdown isn't blocked by long delays)
120        if !config.delay.is_zero() {
121            tokio::select! {
122                _ = time::sleep(config.delay) => {}
123                _ = context.cancelled() => {
124                    debug!(timer = config.name, "Timer cancelled during initial delay");
125                    return Ok(());
126                }
127            }
128        }
129
130        let mut interval = time::interval(config.period);
131        let mut count: u32 = 0;
132
133        loop {
134            tokio::select! {
135                _ = context.cancelled() => {
136                    debug!(timer = config.name, "Timer received cancellation, stopping");
137                    break;
138                }
139                _ = interval.tick() => {
140                    count += 1;
141
142                    debug!(timer = config.name, count, "Timer tick");
143
144                    let mut exchange = Exchange::new(Message::new(format!(
145                        "timer://{} tick #{}",
146                        config.name, count
147                    )));
148                    exchange.input.set_header(
149                        "CamelTimerName",
150                        serde_json::Value::String(config.name.clone()),
151                    );
152                    exchange
153                        .input
154                        .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
155
156                    if context.send(exchange).await.is_err() {
157                        // Channel closed, route was stopped
158                        break;
159                    }
160
161                    if let Some(max) = config.repeat_count
162                        && count >= max
163                    {
164                        break;
165                    }
166                }
167            }
168        }
169
170        Ok(())
171    }
172
173    async fn stop(&mut self) -> Result<(), CamelError> {
174        Ok(())
175    }
176}
177
178// ---------------------------------------------------------------------------
179// Tests
180// ---------------------------------------------------------------------------
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn test_timer_config_defaults() {
188        let config = TimerConfig::from_uri("timer:tick").unwrap();
189        assert_eq!(config.name, "tick");
190        assert_eq!(config.period, Duration::from_millis(1000));
191        assert_eq!(config.delay, Duration::from_millis(0));
192        assert_eq!(config.repeat_count, None);
193    }
194
195    #[test]
196    fn test_timer_config_with_params() {
197        let config =
198            TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
199        assert_eq!(config.name, "myTimer");
200        assert_eq!(config.period, Duration::from_millis(500));
201        assert_eq!(config.delay, Duration::from_millis(100));
202        assert_eq!(config.repeat_count, Some(5));
203    }
204
205    #[test]
206    fn test_timer_config_wrong_scheme() {
207        let result = TimerConfig::from_uri("log:info");
208        assert!(result.is_err());
209    }
210
211    #[test]
212    fn test_timer_component_scheme() {
213        let component = TimerComponent::new();
214        assert_eq!(component.scheme(), "timer");
215    }
216
217    #[test]
218    fn test_timer_component_creates_endpoint() {
219        let component = TimerComponent::new();
220        let endpoint = component.create_endpoint("timer:tick?period=1000");
221        assert!(endpoint.is_ok());
222    }
223
224    #[test]
225    fn test_timer_endpoint_no_producer() {
226        let ctx = ProducerContext::new();
227        let component = TimerComponent::new();
228        let endpoint = component.create_endpoint("timer:tick").unwrap();
229        let producer = endpoint.create_producer(&ctx);
230        assert!(producer.is_err());
231    }
232
233    #[tokio::test]
234    async fn test_timer_consumer_fires() {
235        let component = TimerComponent::new();
236        let endpoint = component
237            .create_endpoint("timer:test?period=50&repeatCount=3")
238            .unwrap();
239        let mut consumer = endpoint.create_consumer().unwrap();
240
241        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
242        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
243
244        // Run consumer in background
245        tokio::spawn(async move {
246            consumer.start(ctx).await.unwrap();
247        });
248
249        // Collect exchanges
250        let mut received = Vec::new();
251        while let Some(envelope) = rx.recv().await {
252            received.push(envelope.exchange);
253            if received.len() == 3 {
254                break;
255            }
256        }
257
258        assert_eq!(received.len(), 3);
259
260        // Verify headers on the first exchange
261        let first = &received[0];
262        assert_eq!(
263            first.input.header("CamelTimerName"),
264            Some(&serde_json::Value::String("test".into()))
265        );
266        assert_eq!(
267            first.input.header("CamelTimerCounter"),
268            Some(&serde_json::Value::Number(1.into()))
269        );
270    }
271
272    #[tokio::test]
273    async fn test_timer_consumer_respects_cancellation() {
274        use tokio_util::sync::CancellationToken;
275
276        let token = CancellationToken::new();
277        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
278        let ctx = ConsumerContext::new(tx, token.clone());
279
280        let mut consumer = TimerConsumer {
281            config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
282        };
283
284        let handle = tokio::spawn(async move {
285            consumer.start(ctx).await.unwrap();
286        });
287
288        // Let it fire a few times
289        tokio::time::sleep(Duration::from_millis(180)).await;
290        token.cancel();
291
292        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
293        assert!(
294            result.is_ok(),
295            "Consumer should have stopped after cancellation"
296        );
297
298        let mut count = 0;
299        while rx.try_recv().is_ok() {
300            count += 1;
301        }
302        assert!(
303            count >= 2,
304            "Expected at least 2 exchanges before cancellation, got {count}"
305        );
306    }
307}