Skip to main content

camel_component_timer/
lib.rs

1//! Timer component for rust-camel — fires `Exchange` events on configurable period, delay, and repeat-count schedules.
2//!
3//! Main types: `TimerComponent`, `TimerConsumer`, `TimerConfig`.
4//! URI format: `timer:name?period=1000&delay=0&repeatCount=0`.
5
6use std::time::Duration;
7
8use async_trait::async_trait;
9use tokio::time;
10use tracing::debug;
11
12use camel_component_api::UriConfig;
13use camel_component_api::{BoxProcessor, CamelError, Exchange, Message};
14use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
15
16// ---------------------------------------------------------------------------
17// TimerConfig
18// ---------------------------------------------------------------------------
19
20/// Configuration parsed from a timer URI.
21///
22/// Format: `timer:name?period=1000&delay=0&repeatCount=0`
23#[derive(Debug, Clone, UriConfig)]
24#[uri_scheme = "timer"]
25#[uri_config(skip_impl, crate = "camel_component_api")]
26pub struct TimerConfig {
27    /// Timer name (the path portion of the URI).
28    pub name: String,
29
30    /// Interval between ticks (milliseconds). Default: 1000.
31    #[allow(dead_code)] // Used by macro-generated Duration conversion
32    #[uri_param(name = "period", default = "1000")]
33    period_ms: u64,
34
35    /// Converted Duration for period.
36    pub period: Duration,
37
38    /// Initial delay before the first tick (milliseconds). Default: 0.
39    #[allow(dead_code)] // Used by macro-generated Duration conversion
40    #[uri_param(name = "delay", default = "0")]
41    delay_ms: u64,
42
43    /// Converted Duration for delay.
44    pub delay: Duration,
45
46    /// Maximum number of ticks. `None` means infinite.
47    #[uri_param(name = "repeatCount")]
48    pub repeat_count: Option<u32>,
49}
50
51impl UriConfig for TimerConfig {
52    fn scheme() -> &'static str {
53        "timer"
54    }
55
56    fn from_uri(uri: &str) -> Result<Self, CamelError> {
57        let parts = camel_component_api::parse_uri(uri)?;
58        Self::from_components(parts)
59    }
60
61    fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
62        let config = Self::parse_uri_components(parts)?;
63        config.validate()
64    }
65
66    fn validate(self) -> Result<Self, CamelError> {
67        if self.name.trim().is_empty() {
68            return Err(CamelError::InvalidUri(
69                "timer name must not be empty".to_string(),
70            ));
71        }
72        if self.period.is_zero() {
73            return Err(CamelError::InvalidUri(
74                "timer period must be greater than 0".to_string(),
75            ));
76        }
77        Ok(self)
78    }
79}
80
81// ---------------------------------------------------------------------------
82// TimerComponent
83// ---------------------------------------------------------------------------
84
85/// The Timer component produces exchanges on a periodic interval.
86pub struct TimerComponent;
87
88impl TimerComponent {
89    pub fn new() -> Self {
90        Self
91    }
92}
93
94impl Default for TimerComponent {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100impl Component for TimerComponent {
101    fn scheme(&self) -> &str {
102        "timer"
103    }
104
105    fn create_endpoint(
106        &self,
107        uri: &str,
108        _ctx: &dyn camel_component_api::ComponentContext,
109    ) -> Result<Box<dyn Endpoint>, CamelError> {
110        let config = TimerConfig::from_uri(uri)?;
111        Ok(Box::new(TimerEndpoint {
112            uri: uri.to_string(),
113            config,
114        }))
115    }
116}
117
118// ---------------------------------------------------------------------------
119// TimerEndpoint
120// ---------------------------------------------------------------------------
121
122struct TimerEndpoint {
123    uri: String,
124    config: TimerConfig,
125}
126
127impl Endpoint for TimerEndpoint {
128    fn uri(&self) -> &str {
129        &self.uri
130    }
131
132    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
133        Ok(Box::new(TimerConsumer {
134            config: self.config.clone(),
135        }))
136    }
137
138    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
139        Err(CamelError::EndpointCreationFailed(
140            "timer endpoint does not support producers".to_string(),
141        ))
142    }
143}
144
145// ---------------------------------------------------------------------------
146// TimerConsumer
147// ---------------------------------------------------------------------------
148
149struct TimerConsumer {
150    config: TimerConfig,
151}
152
153#[async_trait]
154impl Consumer for TimerConsumer {
155    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
156        let config = self.config.clone();
157
158        // Initial delay (cancellable so shutdown isn't blocked by long delays)
159        if !config.delay.is_zero() {
160            tokio::select! {
161                _ = time::sleep(config.delay) => {}
162                _ = context.cancelled() => {
163                    debug!(timer = config.name, "Timer cancelled during initial delay");
164                    return Ok(());
165                }
166            }
167        }
168
169        let mut interval = time::interval(config.period);
170        let mut count: u32 = 0;
171
172        loop {
173            tokio::select! {
174                _ = context.cancelled() => {
175                    debug!(timer = config.name, "Timer received cancellation, stopping");
176                    break;
177                }
178                _ = interval.tick() => {
179                    count += 1;
180
181                    debug!(timer = config.name, count, "Timer tick");
182
183                    let mut exchange = Exchange::new(Message::new(format!(
184                        "timer://{} tick #{}",
185                        config.name, count
186                    )));
187                    exchange.input.set_header(
188                        "CamelTimerName",
189                        serde_json::Value::String(config.name.clone()),
190                    );
191                    exchange
192                        .input
193                        .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
194
195                    if context.send(exchange).await.is_err() {
196                        // Channel closed, route was stopped
197                        break;
198                    }
199
200                    if let Some(max) = config.repeat_count
201                        && count >= max
202                    {
203                        break;
204                    }
205                }
206            }
207        }
208
209        Ok(())
210    }
211
212    async fn stop(&mut self) -> Result<(), CamelError> {
213        Ok(())
214    }
215}
216
217// ---------------------------------------------------------------------------
218// Tests
219// ---------------------------------------------------------------------------
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use camel_component_api::NoOpComponentContext;
225
226    #[test]
227    fn test_zero_period_rejected() {
228        let result = TimerConfig::from_uri("timer:tick?period=0");
229        assert!(result.is_err(), "period=0 should be rejected");
230        let err_msg = result.unwrap_err().to_string();
231        assert!(err_msg.contains("period"), "error should mention 'period'");
232    }
233
234    #[test]
235    fn test_timer_empty_name_rejected() {
236        let result = TimerConfig::from_uri("timer:");
237        assert!(result.is_err());
238        let err = result.unwrap_err().to_string();
239        assert!(err.contains("must not be empty"), "unexpected error: {err}");
240    }
241
242    #[test]
243    fn test_timer_config_defaults() {
244        let config = TimerConfig::from_uri("timer:tick").unwrap();
245        assert_eq!(config.name, "tick");
246        assert_eq!(config.period, Duration::from_millis(1000));
247        assert_eq!(config.delay, Duration::from_millis(0));
248        assert_eq!(config.repeat_count, None);
249    }
250
251    #[test]
252    fn test_timer_config_with_params() {
253        let config =
254            TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
255        assert_eq!(config.name, "myTimer");
256        assert_eq!(config.period, Duration::from_millis(500));
257        assert_eq!(config.delay, Duration::from_millis(100));
258        assert_eq!(config.repeat_count, Some(5));
259    }
260
261    #[test]
262    fn test_timer_config_wrong_scheme() {
263        let result = TimerConfig::from_uri("log:info");
264        assert!(result.is_err());
265    }
266
267    #[test]
268    fn test_timer_component_scheme() {
269        let component = TimerComponent::new();
270        assert_eq!(component.scheme(), "timer");
271    }
272
273    #[test]
274    fn test_timer_component_creates_endpoint() {
275        let component = TimerComponent::new();
276        let endpoint = component.create_endpoint("timer:tick?period=1000", &NoOpComponentContext);
277        assert!(endpoint.is_ok());
278    }
279
280    #[test]
281    fn test_timer_endpoint_no_producer() {
282        let ctx = ProducerContext::new();
283        let component = TimerComponent::new();
284        let endpoint = component
285            .create_endpoint("timer:tick", &NoOpComponentContext)
286            .unwrap();
287        let producer = endpoint.create_producer(&ctx);
288        assert!(producer.is_err());
289    }
290
291    #[tokio::test]
292    async fn test_timer_consumer_fires() {
293        let component = TimerComponent::new();
294        let endpoint = component
295            .create_endpoint("timer:test?period=50&repeatCount=3", &NoOpComponentContext)
296            .unwrap();
297        let mut consumer = endpoint.create_consumer().unwrap();
298
299        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
300        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
301
302        // Run consumer in background
303        tokio::spawn(async move {
304            consumer.start(ctx).await.unwrap();
305        });
306
307        // Collect exchanges
308        let mut received = Vec::new();
309        while let Some(envelope) = rx.recv().await {
310            received.push(envelope.exchange);
311            if received.len() == 3 {
312                break;
313            }
314        }
315
316        assert_eq!(received.len(), 3);
317
318        // Verify headers on the first exchange
319        let first = &received[0];
320        assert_eq!(
321            first.input.header("CamelTimerName"),
322            Some(&serde_json::Value::String("test".into()))
323        );
324        assert_eq!(
325            first.input.header("CamelTimerCounter"),
326            Some(&serde_json::Value::Number(1.into()))
327        );
328    }
329
330    #[tokio::test]
331    async fn test_timer_consumer_respects_cancellation() {
332        use tokio_util::sync::CancellationToken;
333
334        let token = CancellationToken::new();
335        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
336        let ctx = ConsumerContext::new(tx, token.clone());
337
338        let mut consumer = TimerConsumer {
339            config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
340        };
341
342        let handle = tokio::spawn(async move {
343            consumer.start(ctx).await.unwrap();
344        });
345
346        // Let it fire a few times
347        tokio::time::sleep(Duration::from_millis(180)).await;
348        token.cancel();
349
350        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
351        assert!(
352            result.is_ok(),
353            "Consumer should have stopped after cancellation"
354        );
355
356        let mut count = 0;
357        while rx.try_recv().is_ok() {
358            count += 1;
359        }
360        assert!(
361            count >= 2,
362            "Expected at least 2 exchanges before cancellation, got {count}"
363        );
364    }
365}