Skip to main content

camel_component_timer/
lib.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::time;
5use tracing::debug;
6
7use camel_api::{BoxProcessor, CamelError, Exchange, Message};
8use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
9use camel_endpoint::parse_uri;
10
11// ---------------------------------------------------------------------------
12// TimerConfig
13// ---------------------------------------------------------------------------
14
15/// Configuration parsed from a timer URI.
16///
17/// Format: `timer:name?period=1000&delay=0&repeatCount=0`
18#[derive(Debug, Clone)]
19pub struct TimerConfig {
20    /// Timer name (the path portion of the URI).
21    pub name: String,
22    /// Interval between ticks (milliseconds). Default: 1000.
23    pub period: Duration,
24    /// Initial delay before the first tick (milliseconds). Default: 0.
25    pub delay: Duration,
26    /// Maximum number of ticks. `None` means infinite.
27    pub repeat_count: Option<u32>,
28}
29
30impl TimerConfig {
31    /// Parse a timer URI into a config.
32    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
33        let parts = parse_uri(uri)?;
34        if parts.scheme != "timer" {
35            return Err(CamelError::InvalidUri(format!(
36                "expected scheme 'timer', got '{}'",
37                parts.scheme
38            )));
39        }
40
41        let period = parts
42            .params
43            .get("period")
44            .and_then(|v| v.parse::<u64>().ok())
45            .unwrap_or(1000);
46
47        let delay = parts
48            .params
49            .get("delay")
50            .and_then(|v| v.parse::<u64>().ok())
51            .unwrap_or(0);
52
53        let repeat_count = parts
54            .params
55            .get("repeatCount")
56            .and_then(|v| v.parse::<u32>().ok());
57
58        Ok(Self {
59            name: parts.path,
60            period: Duration::from_millis(period),
61            delay: Duration::from_millis(delay),
62            repeat_count,
63        })
64    }
65}
66
67// ---------------------------------------------------------------------------
68// TimerComponent
69// ---------------------------------------------------------------------------
70
71/// The Timer component produces exchanges on a periodic interval.
72pub struct TimerComponent;
73
74impl TimerComponent {
75    pub fn new() -> Self {
76        Self
77    }
78}
79
80impl Default for TimerComponent {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl Component for TimerComponent {
87    fn scheme(&self) -> &str {
88        "timer"
89    }
90
91    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
92        let config = TimerConfig::from_uri(uri)?;
93        Ok(Box::new(TimerEndpoint {
94            uri: uri.to_string(),
95            config,
96        }))
97    }
98}
99
100// ---------------------------------------------------------------------------
101// TimerEndpoint
102// ---------------------------------------------------------------------------
103
104struct TimerEndpoint {
105    uri: String,
106    config: TimerConfig,
107}
108
109impl Endpoint for TimerEndpoint {
110    fn uri(&self) -> &str {
111        &self.uri
112    }
113
114    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
115        Ok(Box::new(TimerConsumer {
116            config: self.config.clone(),
117        }))
118    }
119
120    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
121        Err(CamelError::EndpointCreationFailed(
122            "timer endpoint does not support producers".to_string(),
123        ))
124    }
125}
126
127// ---------------------------------------------------------------------------
128// TimerConsumer
129// ---------------------------------------------------------------------------
130
131struct TimerConsumer {
132    config: TimerConfig,
133}
134
135#[async_trait]
136impl Consumer for TimerConsumer {
137    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
138        let config = self.config.clone();
139
140        // Initial delay (cancellable so shutdown isn't blocked by long delays)
141        if !config.delay.is_zero() {
142            tokio::select! {
143                _ = time::sleep(config.delay) => {}
144                _ = context.cancelled() => {
145                    debug!(timer = config.name, "Timer cancelled during initial delay");
146                    return Ok(());
147                }
148            }
149        }
150
151        let mut interval = time::interval(config.period);
152        let mut count: u32 = 0;
153
154        loop {
155            tokio::select! {
156                _ = context.cancelled() => {
157                    debug!(timer = config.name, "Timer received cancellation, stopping");
158                    break;
159                }
160                _ = interval.tick() => {
161                    count += 1;
162
163                    debug!(timer = config.name, count, "Timer tick");
164
165                    let mut exchange = Exchange::new(Message::new(format!(
166                        "timer://{} tick #{}",
167                        config.name, count
168                    )));
169                    exchange.input.set_header(
170                        "CamelTimerName",
171                        serde_json::Value::String(config.name.clone()),
172                    );
173                    exchange
174                        .input
175                        .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
176
177                    if context.send(exchange).await.is_err() {
178                        // Channel closed, route was stopped
179                        break;
180                    }
181
182                    if let Some(max) = config.repeat_count
183                        && count >= max
184                    {
185                        break;
186                    }
187                }
188            }
189        }
190
191        Ok(())
192    }
193
194    async fn stop(&mut self) -> Result<(), CamelError> {
195        Ok(())
196    }
197}
198
199// ---------------------------------------------------------------------------
200// Tests
201// ---------------------------------------------------------------------------
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[test]
208    fn test_timer_config_defaults() {
209        let config = TimerConfig::from_uri("timer:tick").unwrap();
210        assert_eq!(config.name, "tick");
211        assert_eq!(config.period, Duration::from_millis(1000));
212        assert_eq!(config.delay, Duration::from_millis(0));
213        assert_eq!(config.repeat_count, None);
214    }
215
216    #[test]
217    fn test_timer_config_with_params() {
218        let config =
219            TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
220        assert_eq!(config.name, "myTimer");
221        assert_eq!(config.period, Duration::from_millis(500));
222        assert_eq!(config.delay, Duration::from_millis(100));
223        assert_eq!(config.repeat_count, Some(5));
224    }
225
226    #[test]
227    fn test_timer_config_wrong_scheme() {
228        let result = TimerConfig::from_uri("log:info");
229        assert!(result.is_err());
230    }
231
232    #[test]
233    fn test_timer_component_scheme() {
234        let component = TimerComponent::new();
235        assert_eq!(component.scheme(), "timer");
236    }
237
238    #[test]
239    fn test_timer_component_creates_endpoint() {
240        let component = TimerComponent::new();
241        let endpoint = component.create_endpoint("timer:tick?period=1000");
242        assert!(endpoint.is_ok());
243    }
244
245    #[test]
246    fn test_timer_endpoint_no_producer() {
247        use std::sync::Arc;
248        use tokio::sync::Mutex;
249
250        // NullRouteController for testing
251        struct NullRouteController;
252        #[async_trait::async_trait]
253        impl camel_api::RouteController for NullRouteController {
254            async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
255                Ok(())
256            }
257            async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
258                Ok(())
259            }
260            async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
261                Ok(())
262            }
263            async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
264                Ok(())
265            }
266            async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
267                Ok(())
268            }
269            fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
270                None
271            }
272            async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
273                Ok(())
274            }
275            async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
276                Ok(())
277            }
278        }
279
280        let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
281        let component = TimerComponent::new();
282        let endpoint = component.create_endpoint("timer:tick").unwrap();
283        let producer = endpoint.create_producer(&ctx);
284        assert!(producer.is_err());
285    }
286
287    #[tokio::test]
288    async fn test_timer_consumer_fires() {
289        let component = TimerComponent::new();
290        let endpoint = component
291            .create_endpoint("timer:test?period=50&repeatCount=3")
292            .unwrap();
293        let mut consumer = endpoint.create_consumer().unwrap();
294
295        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
296        let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
297
298        // Run consumer in background
299        tokio::spawn(async move {
300            consumer.start(ctx).await.unwrap();
301        });
302
303        // Collect exchanges
304        let mut received = Vec::new();
305        while let Some(envelope) = rx.recv().await {
306            received.push(envelope.exchange);
307            if received.len() == 3 {
308                break;
309            }
310        }
311
312        assert_eq!(received.len(), 3);
313
314        // Verify headers on the first exchange
315        let first = &received[0];
316        assert_eq!(
317            first.input.header("CamelTimerName"),
318            Some(&serde_json::Value::String("test".into()))
319        );
320        assert_eq!(
321            first.input.header("CamelTimerCounter"),
322            Some(&serde_json::Value::Number(1.into()))
323        );
324    }
325
326    #[tokio::test]
327    async fn test_timer_consumer_respects_cancellation() {
328        use tokio_util::sync::CancellationToken;
329
330        let token = CancellationToken::new();
331        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
332        let ctx = ConsumerContext::new(tx, token.clone());
333
334        let mut consumer = TimerConsumer {
335            config: TimerConfig {
336                name: "cancel-test".to_string(),
337                period: Duration::from_millis(50),
338                delay: Duration::from_millis(0),
339                repeat_count: None,
340            },
341        };
342
343        let handle = tokio::spawn(async move {
344            consumer.start(ctx).await.unwrap();
345        });
346
347        // Let it fire a few times
348        tokio::time::sleep(Duration::from_millis(180)).await;
349        token.cancel();
350
351        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
352        assert!(
353            result.is_ok(),
354            "Consumer should have stopped after cancellation"
355        );
356
357        let mut count = 0;
358        while rx.try_recv().is_ok() {
359            count += 1;
360        }
361        assert!(
362            count >= 2,
363            "Expected at least 2 exchanges before cancellation, got {count}"
364        );
365    }
366}