Skip to main content

camel_component_timer/
lib.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::time;
5use tracing::debug;
6
7use camel_api::{BoxProcessor, CamelError, Exchange, Message};
8use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
9use camel_endpoint::UriConfig;
10
11// ---------------------------------------------------------------------------
12// TimerConfig
13// ---------------------------------------------------------------------------
14
15/// Configuration parsed from a timer URI.
16///
17/// Format: `timer:name?period=1000&delay=0&repeatCount=0`
18#[derive(Debug, Clone, UriConfig)]
19#[uri_scheme = "timer"]
20pub struct TimerConfig {
21    /// Timer name (the path portion of the URI).
22    pub name: String,
23
24    /// Interval between ticks (milliseconds). Default: 1000.
25    #[allow(dead_code)] // Used by macro-generated Duration conversion
26    #[uri_param(name = "period", default = "1000")]
27    period_ms: u64,
28
29    /// Converted Duration for period.
30    pub period: Duration,
31
32    /// Initial delay before the first tick (milliseconds). Default: 0.
33    #[allow(dead_code)] // Used by macro-generated Duration conversion
34    #[uri_param(name = "delay", default = "0")]
35    delay_ms: u64,
36
37    /// Converted Duration for delay.
38    pub delay: Duration,
39
40    /// Maximum number of ticks. `None` means infinite.
41    #[uri_param(name = "repeatCount")]
42    pub repeat_count: Option<u32>,
43}
44
45// ---------------------------------------------------------------------------
46// TimerComponent
47// ---------------------------------------------------------------------------
48
49/// The Timer component produces exchanges on a periodic interval.
50pub struct TimerComponent;
51
52impl TimerComponent {
53    pub fn new() -> Self {
54        Self
55    }
56}
57
58impl Default for TimerComponent {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64impl Component for TimerComponent {
65    fn scheme(&self) -> &str {
66        "timer"
67    }
68
69    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
70        let config = TimerConfig::from_uri(uri)?;
71        Ok(Box::new(TimerEndpoint {
72            uri: uri.to_string(),
73            config,
74        }))
75    }
76}
77
78// ---------------------------------------------------------------------------
79// TimerEndpoint
80// ---------------------------------------------------------------------------
81
82struct TimerEndpoint {
83    uri: String,
84    config: TimerConfig,
85}
86
87impl Endpoint for TimerEndpoint {
88    fn uri(&self) -> &str {
89        &self.uri
90    }
91
92    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
93        Ok(Box::new(TimerConsumer {
94            config: self.config.clone(),
95        }))
96    }
97
98    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
99        Err(CamelError::EndpointCreationFailed(
100            "timer endpoint does not support producers".to_string(),
101        ))
102    }
103}
104
105// ---------------------------------------------------------------------------
106// TimerConsumer
107// ---------------------------------------------------------------------------
108
109struct TimerConsumer {
110    config: TimerConfig,
111}
112
113#[async_trait]
114impl Consumer for TimerConsumer {
115    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
116        let config = self.config.clone();
117
118        // Initial delay (cancellable so shutdown isn't blocked by long delays)
119        if !config.delay.is_zero() {
120            tokio::select! {
121                _ = time::sleep(config.delay) => {}
122                _ = context.cancelled() => {
123                    debug!(timer = config.name, "Timer cancelled during initial delay");
124                    return Ok(());
125                }
126            }
127        }
128
129        let mut interval = time::interval(config.period);
130        let mut count: u32 = 0;
131
132        loop {
133            tokio::select! {
134                _ = context.cancelled() => {
135                    debug!(timer = config.name, "Timer received cancellation, stopping");
136                    break;
137                }
138                _ = interval.tick() => {
139                    count += 1;
140
141                    debug!(timer = config.name, count, "Timer tick");
142
143                    let mut exchange = Exchange::new(Message::new(format!(
144                        "timer://{} tick #{}",
145                        config.name, count
146                    )));
147                    exchange.input.set_header(
148                        "CamelTimerName",
149                        serde_json::Value::String(config.name.clone()),
150                    );
151                    exchange
152                        .input
153                        .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
154
155                    if context.send(exchange).await.is_err() {
156                        // Channel closed, route was stopped
157                        break;
158                    }
159
160                    if let Some(max) = config.repeat_count
161                        && count >= max
162                    {
163                        break;
164                    }
165                }
166            }
167        }
168
169        Ok(())
170    }
171
172    async fn stop(&mut self) -> Result<(), CamelError> {
173        Ok(())
174    }
175}
176
177// ---------------------------------------------------------------------------
178// Tests
179// ---------------------------------------------------------------------------
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    #[test]
186    fn test_timer_config_defaults() {
187        let config = TimerConfig::from_uri("timer:tick").unwrap();
188        assert_eq!(config.name, "tick");
189        assert_eq!(config.period, Duration::from_millis(1000));
190        assert_eq!(config.delay, Duration::from_millis(0));
191        assert_eq!(config.repeat_count, None);
192    }
193
194    #[test]
195    fn test_timer_config_with_params() {
196        let config =
197            TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
198        assert_eq!(config.name, "myTimer");
199        assert_eq!(config.period, Duration::from_millis(500));
200        assert_eq!(config.delay, Duration::from_millis(100));
201        assert_eq!(config.repeat_count, Some(5));
202    }
203
204    #[test]
205    fn test_timer_config_wrong_scheme() {
206        let result = TimerConfig::from_uri("log:info");
207        assert!(result.is_err());
208    }
209
210    #[test]
211    fn test_timer_component_scheme() {
212        let component = TimerComponent::new();
213        assert_eq!(component.scheme(), "timer");
214    }
215
216    #[test]
217    fn test_timer_component_creates_endpoint() {
218        let component = TimerComponent::new();
219        let endpoint = component.create_endpoint("timer:tick?period=1000");
220        assert!(endpoint.is_ok());
221    }
222
223    #[test]
224    fn test_timer_endpoint_no_producer() {
225        let ctx = ProducerContext::new();
226        let component = TimerComponent::new();
227        let endpoint = component.create_endpoint("timer:tick").unwrap();
228        let producer = endpoint.create_producer(&ctx);
229        assert!(producer.is_err());
230    }
231
232    #[tokio::test]
233    async fn test_timer_consumer_fires() {
234        let component = TimerComponent::new();
235        let endpoint = component
236            .create_endpoint("timer:test?period=50&repeatCount=3")
237            .unwrap();
238        let mut consumer = endpoint.create_consumer().unwrap();
239
240        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
241        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
242
243        // Run consumer in background
244        tokio::spawn(async move {
245            consumer.start(ctx).await.unwrap();
246        });
247
248        // Collect exchanges
249        let mut received = Vec::new();
250        while let Some(envelope) = rx.recv().await {
251            received.push(envelope.exchange);
252            if received.len() == 3 {
253                break;
254            }
255        }
256
257        assert_eq!(received.len(), 3);
258
259        // Verify headers on the first exchange
260        let first = &received[0];
261        assert_eq!(
262            first.input.header("CamelTimerName"),
263            Some(&serde_json::Value::String("test".into()))
264        );
265        assert_eq!(
266            first.input.header("CamelTimerCounter"),
267            Some(&serde_json::Value::Number(1.into()))
268        );
269    }
270
271    #[tokio::test]
272    async fn test_timer_consumer_respects_cancellation() {
273        use tokio_util::sync::CancellationToken;
274
275        let token = CancellationToken::new();
276        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
277        let ctx = ConsumerContext::new(tx, token.clone());
278
279        let mut consumer = TimerConsumer {
280            config: TimerConfig::from_uri("timer:cancel-test?period=50").unwrap(),
281        };
282
283        let handle = tokio::spawn(async move {
284            consumer.start(ctx).await.unwrap();
285        });
286
287        // Let it fire a few times
288        tokio::time::sleep(Duration::from_millis(180)).await;
289        token.cancel();
290
291        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
292        assert!(
293            result.is_ok(),
294            "Consumer should have stopped after cancellation"
295        );
296
297        let mut count = 0;
298        while rx.try_recv().is_ok() {
299            count += 1;
300        }
301        assert!(
302            count >= 2,
303            "Expected at least 2 exchanges before cancellation, got {count}"
304        );
305    }
306}