camel_component_timer/
lib.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::time;
5use tracing::debug;
6
7use camel_api::{BoxProcessor, CamelError, Exchange, Message};
8use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
9use camel_endpoint::parse_uri;
10
11#[derive(Debug, Clone)]
19pub struct TimerConfig {
20 pub name: String,
22 pub period: Duration,
24 pub delay: Duration,
26 pub repeat_count: Option<u32>,
28}
29
30impl TimerConfig {
31 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
33 let parts = parse_uri(uri)?;
34 if parts.scheme != "timer" {
35 return Err(CamelError::InvalidUri(format!(
36 "expected scheme 'timer', got '{}'",
37 parts.scheme
38 )));
39 }
40
41 let period = parts
42 .params
43 .get("period")
44 .and_then(|v| v.parse::<u64>().ok())
45 .unwrap_or(1000);
46
47 let delay = parts
48 .params
49 .get("delay")
50 .and_then(|v| v.parse::<u64>().ok())
51 .unwrap_or(0);
52
53 let repeat_count = parts
54 .params
55 .get("repeatCount")
56 .and_then(|v| v.parse::<u32>().ok());
57
58 Ok(Self {
59 name: parts.path,
60 period: Duration::from_millis(period),
61 delay: Duration::from_millis(delay),
62 repeat_count,
63 })
64 }
65}
66
67pub struct TimerComponent;
73
74impl TimerComponent {
75 pub fn new() -> Self {
76 Self
77 }
78}
79
80impl Default for TimerComponent {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Component for TimerComponent {
87 fn scheme(&self) -> &str {
88 "timer"
89 }
90
91 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
92 let config = TimerConfig::from_uri(uri)?;
93 Ok(Box::new(TimerEndpoint {
94 uri: uri.to_string(),
95 config,
96 }))
97 }
98}
99
100struct TimerEndpoint {
105 uri: String,
106 config: TimerConfig,
107}
108
109impl Endpoint for TimerEndpoint {
110 fn uri(&self) -> &str {
111 &self.uri
112 }
113
114 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
115 Ok(Box::new(TimerConsumer {
116 config: self.config.clone(),
117 }))
118 }
119
120 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
121 Err(CamelError::EndpointCreationFailed(
122 "timer endpoint does not support producers".to_string(),
123 ))
124 }
125}
126
127struct TimerConsumer {
132 config: TimerConfig,
133}
134
135#[async_trait]
136impl Consumer for TimerConsumer {
137 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
138 let config = self.config.clone();
139
140 if !config.delay.is_zero() {
142 tokio::select! {
143 _ = time::sleep(config.delay) => {}
144 _ = context.cancelled() => {
145 debug!(timer = config.name, "Timer cancelled during initial delay");
146 return Ok(());
147 }
148 }
149 }
150
151 let mut interval = time::interval(config.period);
152 let mut count: u32 = 0;
153
154 loop {
155 tokio::select! {
156 _ = context.cancelled() => {
157 debug!(timer = config.name, "Timer received cancellation, stopping");
158 break;
159 }
160 _ = interval.tick() => {
161 count += 1;
162
163 debug!(timer = config.name, count, "Timer tick");
164
165 let mut exchange = Exchange::new(Message::new(format!(
166 "timer://{} tick #{}",
167 config.name, count
168 )));
169 exchange.input.set_header(
170 "CamelTimerName",
171 serde_json::Value::String(config.name.clone()),
172 );
173 exchange
174 .input
175 .set_header("CamelTimerCounter", serde_json::Value::Number(count.into()));
176
177 if context.send(exchange).await.is_err() {
178 break;
180 }
181
182 if let Some(max) = config.repeat_count
183 && count >= max
184 {
185 break;
186 }
187 }
188 }
189 }
190
191 Ok(())
192 }
193
194 async fn stop(&mut self) -> Result<(), CamelError> {
195 Ok(())
196 }
197}
198
199#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[test]
208 fn test_timer_config_defaults() {
209 let config = TimerConfig::from_uri("timer:tick").unwrap();
210 assert_eq!(config.name, "tick");
211 assert_eq!(config.period, Duration::from_millis(1000));
212 assert_eq!(config.delay, Duration::from_millis(0));
213 assert_eq!(config.repeat_count, None);
214 }
215
216 #[test]
217 fn test_timer_config_with_params() {
218 let config =
219 TimerConfig::from_uri("timer:myTimer?period=500&delay=100&repeatCount=5").unwrap();
220 assert_eq!(config.name, "myTimer");
221 assert_eq!(config.period, Duration::from_millis(500));
222 assert_eq!(config.delay, Duration::from_millis(100));
223 assert_eq!(config.repeat_count, Some(5));
224 }
225
226 #[test]
227 fn test_timer_config_wrong_scheme() {
228 let result = TimerConfig::from_uri("log:info");
229 assert!(result.is_err());
230 }
231
232 #[test]
233 fn test_timer_component_scheme() {
234 let component = TimerComponent::new();
235 assert_eq!(component.scheme(), "timer");
236 }
237
238 #[test]
239 fn test_timer_component_creates_endpoint() {
240 let component = TimerComponent::new();
241 let endpoint = component.create_endpoint("timer:tick?period=1000");
242 assert!(endpoint.is_ok());
243 }
244
245 #[test]
246 fn test_timer_endpoint_no_producer() {
247 use std::sync::Arc;
248 use tokio::sync::Mutex;
249
250 struct NullRouteController;
252 #[async_trait::async_trait]
253 impl camel_api::RouteController for NullRouteController {
254 async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
255 Ok(())
256 }
257 async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
258 Ok(())
259 }
260 async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
261 Ok(())
262 }
263 async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
264 Ok(())
265 }
266 async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
267 Ok(())
268 }
269 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
270 None
271 }
272 async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
273 Ok(())
274 }
275 async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
276 Ok(())
277 }
278 }
279
280 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
281 let component = TimerComponent::new();
282 let endpoint = component.create_endpoint("timer:tick").unwrap();
283 let producer = endpoint.create_producer(&ctx);
284 assert!(producer.is_err());
285 }
286
287 #[tokio::test]
288 async fn test_timer_consumer_fires() {
289 let component = TimerComponent::new();
290 let endpoint = component
291 .create_endpoint("timer:test?period=50&repeatCount=3")
292 .unwrap();
293 let mut consumer = endpoint.create_consumer().unwrap();
294
295 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
296 let ctx = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
297
298 tokio::spawn(async move {
300 consumer.start(ctx).await.unwrap();
301 });
302
303 let mut received = Vec::new();
305 while let Some(envelope) = rx.recv().await {
306 received.push(envelope.exchange);
307 if received.len() == 3 {
308 break;
309 }
310 }
311
312 assert_eq!(received.len(), 3);
313
314 let first = &received[0];
316 assert_eq!(
317 first.input.header("CamelTimerName"),
318 Some(&serde_json::Value::String("test".into()))
319 );
320 assert_eq!(
321 first.input.header("CamelTimerCounter"),
322 Some(&serde_json::Value::Number(1.into()))
323 );
324 }
325
326 #[tokio::test]
327 async fn test_timer_consumer_respects_cancellation() {
328 use tokio_util::sync::CancellationToken;
329
330 let token = CancellationToken::new();
331 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
332 let ctx = ConsumerContext::new(tx, token.clone());
333
334 let mut consumer = TimerConsumer {
335 config: TimerConfig {
336 name: "cancel-test".to_string(),
337 period: Duration::from_millis(50),
338 delay: Duration::from_millis(0),
339 repeat_count: None,
340 },
341 };
342
343 let handle = tokio::spawn(async move {
344 consumer.start(ctx).await.unwrap();
345 });
346
347 tokio::time::sleep(Duration::from_millis(180)).await;
349 token.cancel();
350
351 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
352 assert!(
353 result.is_ok(),
354 "Consumer should have stopped after cancellation"
355 );
356
357 let mut count = 0;
358 while rx.try_recv().is_ok() {
359 count += 1;
360 }
361 assert!(
362 count >= 2,
363 "Expected at least 2 exchanges before cancellation, got {count}"
364 );
365 }
366}