Skip to main content

camel_component_controlbus/
lib.rs

1//! ControlBus component for managing route lifecycle.
2//!
3//! This component allows routes to control other routes in the Camel context
4//! using the ControlBus EIP pattern. It provides operations like start, stop,
5//! suspend, resume, and restart for routes.
6//!
7//! # URI Format
8//!
9//! `controlbus:route?routeId=my-route&action=start`
10//!
11//! # Parameters
12//!
13//! - `routeId`: The ID of the route to operate on (optional, can come from exchange header)
14//! - `action`: The action to perform: `start`, `stop`, `suspend`, `resume`, `restart`, `status`
15//!
16//! # Example
17//!
18//! ```ignore
19//! // Start a route
20//! from("timer:start?period=60000")
21//!     .to("controlbus:route?routeId=my-route&action=start");
22//!
23//! // Get route status
24//! from("direct:getStatus")
25//!     .to("controlbus:route?routeId=my-route&action=status")
26//!     .log("Status: ${body}");
27//! ```
28
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33
34#[cfg(test)]
35use async_trait::async_trait;
36use tokio::sync::Mutex;
37use tower::Service;
38use tracing::debug;
39
40use camel_api::{Body, BoxProcessor, CamelError, Exchange, RouteAction, RouteStatus};
41use camel_component::{Component, Consumer, Endpoint, ProducerContext};
42use camel_endpoint::parse_uri;
43
44// ---------------------------------------------------------------------------
45// ControlBusComponent
46// ---------------------------------------------------------------------------
47
48/// The ControlBus component for managing route lifecycle.
49pub struct ControlBusComponent;
50
51impl ControlBusComponent {
52    /// Create a new ControlBus component.
53    pub fn new() -> Self {
54        Self
55    }
56}
57
58impl Default for ControlBusComponent {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64impl Component for ControlBusComponent {
65    fn scheme(&self) -> &str {
66        "controlbus"
67    }
68
69    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
70        let parts = parse_uri(uri)?;
71
72        if parts.scheme != "controlbus" {
73            return Err(CamelError::InvalidUri(format!(
74                "expected scheme 'controlbus', got '{}'",
75                parts.scheme
76            )));
77        }
78
79        // Parse command (path portion after controlbus:)
80        let command = parts.path.clone();
81
82        // Validate command - only "route" is supported
83        if command != "route" {
84            return Err(CamelError::EndpointCreationFailed(format!(
85                "controlbus: unknown command '{}', only 'route' is supported",
86                command
87            )));
88        }
89
90        // Parse routeId parameter
91        let route_id = parts.params.get("routeId").cloned();
92
93        // Parse action parameter
94        let action = if let Some(action_str) = parts.params.get("action") {
95            Some(parse_action(action_str)?)
96        } else {
97            None
98        };
99
100        // Validate: for "route" command, action is required
101        if command == "route" && action.is_none() {
102            return Err(CamelError::EndpointCreationFailed(
103                "controlbus: 'action' parameter is required for route command".to_string(),
104            ));
105        }
106
107        Ok(Box::new(ControlBusEndpoint {
108            uri: uri.to_string(),
109            route_id,
110            action,
111        }))
112    }
113}
114
115/// Parse an action string into a RouteAction.
116fn parse_action(s: &str) -> Result<RouteAction, CamelError> {
117    match s.to_lowercase().as_str() {
118        "start" => Ok(RouteAction::Start),
119        "stop" => Ok(RouteAction::Stop),
120        "suspend" => Ok(RouteAction::Suspend),
121        "resume" => Ok(RouteAction::Resume),
122        "restart" => Ok(RouteAction::Restart),
123        "status" => Ok(RouteAction::Status),
124        _ => Err(CamelError::EndpointCreationFailed(format!(
125            "controlbus: unknown action '{}'",
126            s
127        ))),
128    }
129}
130
131// ---------------------------------------------------------------------------
132// ControlBusEndpoint
133// ---------------------------------------------------------------------------
134
135/// Endpoint for the ControlBus component.
136struct ControlBusEndpoint {
137    uri: String,
138    route_id: Option<String>,
139    action: Option<RouteAction>,
140}
141
142impl Endpoint for ControlBusEndpoint {
143    fn uri(&self) -> &str {
144        &self.uri
145    }
146
147    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
148        Err(CamelError::EndpointCreationFailed(
149            "controlbus endpoint does not support consumers".to_string(),
150        ))
151    }
152
153    fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
154        let action = self.action.clone().ok_or_else(|| {
155            CamelError::EndpointCreationFailed(
156                "controlbus: action is required to create producer".to_string(),
157            )
158        })?;
159
160        Ok(BoxProcessor::new(ControlBusProducer {
161            route_id: self.route_id.clone(),
162            action,
163            controller: ctx.route_controller().clone(),
164        }))
165    }
166}
167
168// ---------------------------------------------------------------------------
169// ControlBusProducer
170// ---------------------------------------------------------------------------
171
172/// Producer that executes control bus actions on routes.
173#[derive(Clone)]
174struct ControlBusProducer {
175    /// Route ID from URI params (may be None, in which case header is used).
176    route_id: Option<String>,
177    /// Action to perform on the route.
178    action: RouteAction,
179    /// Route controller for executing actions.
180    controller: Arc<Mutex<dyn camel_api::RouteController>>,
181}
182
183impl Service<Exchange> for ControlBusProducer {
184    type Response = Exchange;
185    type Error = CamelError;
186    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
187
188    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
189        Poll::Ready(Ok(()))
190    }
191
192    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
193        // Get route_id: prefer field, fallback to header "CamelRouteId"
194        let route_id = self.route_id.clone().or_else(|| {
195            exchange
196                .input
197                .header("CamelRouteId")
198                .and_then(|v| v.as_str().map(|s| s.to_string()))
199        });
200
201        // If no route_id → error
202        let route_id = match route_id {
203            Some(id) => id,
204            None => {
205                return Box::pin(async {
206                    Err(CamelError::ProcessorError(
207                        "controlbus: routeId required (set via URI param or CamelRouteId header)"
208                            .into(),
209                    ))
210                });
211            }
212        };
213
214        let action = self.action.clone();
215        let controller = self.controller.clone();
216
217        Box::pin(async move {
218            let mut ctrl = controller.lock().await;
219
220            debug!(
221                route_id = %route_id,
222                action = ?action,
223                "ControlBus executing action"
224            );
225
226            match action {
227                RouteAction::Start => {
228                    ctrl.start_route(&route_id).await?;
229                }
230                RouteAction::Stop => {
231                    ctrl.stop_route(&route_id).await?;
232                }
233                RouteAction::Suspend => {
234                    ctrl.suspend_route(&route_id).await?;
235                }
236                RouteAction::Resume => {
237                    ctrl.resume_route(&route_id).await?;
238                }
239                RouteAction::Restart => {
240                    ctrl.restart_route(&route_id).await?;
241                }
242                RouteAction::Status => {
243                    let status = ctrl.route_status(&route_id).ok_or_else(|| {
244                        CamelError::ProcessorError(format!(
245                            "controlbus: route '{}' not found",
246                            route_id
247                        ))
248                    })?;
249                    exchange.input.body = Body::Text(format_status(&status));
250                    return Ok(exchange);
251                }
252            }
253
254            // For all actions except Status, set body to Empty
255            exchange.input.body = Body::Empty;
256            Ok(exchange)
257        })
258    }
259}
260
261/// Format a RouteStatus for display in the exchange body.
262fn format_status(status: &RouteStatus) -> String {
263    match status {
264        RouteStatus::Stopped => "Stopped".to_string(),
265        RouteStatus::Starting => "Starting".to_string(),
266        RouteStatus::Started => "Started".to_string(),
267        RouteStatus::Stopping => "Stopping".to_string(),
268        RouteStatus::Suspended => "Suspended".to_string(),
269        RouteStatus::Failed(msg) => format!("Failed: {}", msg),
270    }
271}
272
273// ---------------------------------------------------------------------------
274// Tests
275// ---------------------------------------------------------------------------
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use camel_api::Message;
281    use tower::ServiceExt;
282
283    /// A mock route controller for testing.
284    struct MockRouteController {
285        routes: std::collections::HashMap<String, RouteStatus>,
286    }
287
288    impl MockRouteController {
289        fn new() -> Self {
290            Self {
291                routes: std::collections::HashMap::new(),
292            }
293        }
294
295        fn with_route(mut self, id: &str, status: RouteStatus) -> Self {
296            self.routes.insert(id.to_string(), status);
297            self
298        }
299    }
300
301    #[async_trait]
302    impl camel_api::RouteController for MockRouteController {
303        async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
304            if self.routes.contains_key(route_id) {
305                self.routes
306                    .insert(route_id.to_string(), RouteStatus::Started);
307                Ok(())
308            } else {
309                Err(CamelError::ProcessorError(format!(
310                    "route '{}' not found",
311                    route_id
312                )))
313            }
314        }
315
316        async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
317            if self.routes.contains_key(route_id) {
318                self.routes
319                    .insert(route_id.to_string(), RouteStatus::Stopped);
320                Ok(())
321            } else {
322                Err(CamelError::ProcessorError(format!(
323                    "route '{}' not found",
324                    route_id
325                )))
326            }
327        }
328
329        async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
330            if self.routes.contains_key(route_id) {
331                self.routes
332                    .insert(route_id.to_string(), RouteStatus::Started);
333                Ok(())
334            } else {
335                Err(CamelError::ProcessorError(format!(
336                    "route '{}' not found",
337                    route_id
338                )))
339            }
340        }
341
342        async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
343            if self.routes.contains_key(route_id) {
344                self.routes
345                    .insert(route_id.to_string(), RouteStatus::Suspended);
346                Ok(())
347            } else {
348                Err(CamelError::ProcessorError(format!(
349                    "route '{}' not found",
350                    route_id
351                )))
352            }
353        }
354
355        async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
356            if self.routes.contains_key(route_id) {
357                self.routes
358                    .insert(route_id.to_string(), RouteStatus::Started);
359                Ok(())
360            } else {
361                Err(CamelError::ProcessorError(format!(
362                    "route '{}' not found",
363                    route_id
364                )))
365            }
366        }
367
368        fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
369            self.routes.get(route_id).cloned()
370        }
371
372        async fn start_all_routes(&mut self) -> Result<(), CamelError> {
373            for status in self.routes.values_mut() {
374                *status = RouteStatus::Started;
375            }
376            Ok(())
377        }
378
379        async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
380            for status in self.routes.values_mut() {
381                *status = RouteStatus::Stopped;
382            }
383            Ok(())
384        }
385    }
386
387    fn test_producer_ctx() -> ProducerContext {
388        ProducerContext::new(Arc::new(Mutex::new(MockRouteController::new())))
389    }
390
391    fn test_producer_ctx_with_route(id: &str, status: RouteStatus) -> ProducerContext {
392        ProducerContext::new(Arc::new(Mutex::new(
393            MockRouteController::new().with_route(id, status),
394        )))
395    }
396
397    #[test]
398    fn test_endpoint_requires_action_for_route_command() {
399        let comp = ControlBusComponent::new();
400        let result = comp.create_endpoint("controlbus:route?routeId=foo");
401        assert!(result.is_err(), "Should error when action is missing");
402    }
403
404    #[test]
405    fn test_endpoint_rejects_unknown_action() {
406        let comp = ControlBusComponent::new();
407        let result = comp.create_endpoint("controlbus:route?routeId=foo&action=banana");
408        assert!(result.is_err(), "Should error for unknown action");
409    }
410
411    #[test]
412    fn test_endpoint_parses_valid_uri() {
413        let comp = ControlBusComponent::new();
414        let endpoint = comp
415            .create_endpoint("controlbus:route?routeId=foo&action=start")
416            .unwrap();
417        assert_eq!(endpoint.uri(), "controlbus:route?routeId=foo&action=start");
418    }
419
420    #[test]
421    fn test_endpoint_returns_no_consumer() {
422        let comp = ControlBusComponent::new();
423        let endpoint = comp
424            .create_endpoint("controlbus:route?routeId=foo&action=stop")
425            .unwrap();
426        assert!(endpoint.create_consumer().is_err());
427    }
428
429    #[test]
430    fn test_endpoint_creates_producer() {
431        let ctx = test_producer_ctx();
432        let comp = ControlBusComponent::new();
433        let endpoint = comp
434            .create_endpoint("controlbus:route?routeId=foo&action=start")
435            .unwrap();
436        assert!(endpoint.create_producer(&ctx).is_ok());
437    }
438
439    #[test]
440    fn test_component_scheme() {
441        let comp = ControlBusComponent::new();
442        assert_eq!(comp.scheme(), "controlbus");
443    }
444
445    #[tokio::test]
446    async fn test_producer_start_route() {
447        let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Stopped);
448        let comp = ControlBusComponent::new();
449        let endpoint = comp
450            .create_endpoint("controlbus:route?routeId=my-route&action=start")
451            .unwrap();
452        let producer = endpoint.create_producer(&ctx).unwrap();
453
454        let exchange = Exchange::new(Message::default());
455        let result = producer.oneshot(exchange).await.unwrap();
456        assert!(matches!(result.input.body, Body::Empty));
457    }
458
459    #[tokio::test]
460    async fn test_producer_stop_route() {
461        let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Started);
462        let comp = ControlBusComponent::new();
463        let endpoint = comp
464            .create_endpoint("controlbus:route?routeId=my-route&action=stop")
465            .unwrap();
466        let producer = endpoint.create_producer(&ctx).unwrap();
467
468        let exchange = Exchange::new(Message::default());
469        let result = producer.oneshot(exchange).await.unwrap();
470        assert!(matches!(result.input.body, Body::Empty));
471    }
472
473    #[tokio::test]
474    async fn test_producer_status_route() {
475        let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Started);
476        let comp = ControlBusComponent::new();
477        let endpoint = comp
478            .create_endpoint("controlbus:route?routeId=my-route&action=status")
479            .unwrap();
480        let producer = endpoint.create_producer(&ctx).unwrap();
481
482        let exchange = Exchange::new(Message::default());
483        let result = producer.oneshot(exchange).await.unwrap();
484        assert!(matches!(result.input.body, Body::Text(_)));
485        if let Body::Text(status) = &result.input.body {
486            assert_eq!(status, "Started");
487        }
488    }
489
490    #[tokio::test]
491    async fn test_producer_status_failed_route() {
492        let ctx =
493            test_producer_ctx_with_route("my-route", RouteStatus::Failed("error msg".to_string()));
494        let comp = ControlBusComponent::new();
495        let endpoint = comp
496            .create_endpoint("controlbus:route?routeId=my-route&action=status")
497            .unwrap();
498        let producer = endpoint.create_producer(&ctx).unwrap();
499
500        let exchange = Exchange::new(Message::default());
501        let result = producer.oneshot(exchange).await.unwrap();
502        assert!(matches!(result.input.body, Body::Text(_)));
503        if let Body::Text(status) = &result.input.body {
504            assert_eq!(status, "Failed: error msg");
505        }
506    }
507
508    #[tokio::test]
509    async fn test_producer_uses_header_route_id() {
510        let ctx = test_producer_ctx_with_route("from-header", RouteStatus::Started);
511        let comp = ControlBusComponent::new();
512        // No routeId in URI
513        let endpoint = comp
514            .create_endpoint("controlbus:route?action=status")
515            .unwrap();
516        let producer = endpoint.create_producer(&ctx).unwrap();
517
518        let mut exchange = Exchange::new(Message::default());
519        exchange.input.set_header(
520            "CamelRouteId",
521            serde_json::Value::String("from-header".to_string()),
522        );
523
524        let result = producer.oneshot(exchange).await.unwrap();
525        assert!(matches!(result.input.body, Body::Text(_)));
526        if let Body::Text(status) = &result.input.body {
527            assert_eq!(status, "Started");
528        }
529    }
530
531    #[tokio::test]
532    async fn test_producer_uri_route_id_overrides_header() {
533        let ctx = test_producer_ctx_with_route("from-uri", RouteStatus::Started);
534        // Use ctx which has "from-uri" route
535        let comp = ControlBusComponent::new();
536        let endpoint = comp
537            .create_endpoint("controlbus:route?routeId=from-uri&action=status")
538            .unwrap();
539        let producer = endpoint.create_producer(&ctx).unwrap();
540
541        let mut exchange = Exchange::new(Message::default());
542        // Header has different route ID, but URI param should take precedence
543        exchange.input.set_header(
544            "CamelRouteId",
545            serde_json::Value::String("from-header".to_string()),
546        );
547
548        let result = producer.oneshot(exchange).await.unwrap();
549        if let Body::Text(status) = &result.input.body {
550            assert_eq!(status, "Started", "Should use URI routeId, not header");
551        }
552    }
553
554    #[tokio::test]
555    async fn test_producer_error_no_route_id() {
556        let ctx = test_producer_ctx();
557        let comp = ControlBusComponent::new();
558        let endpoint = comp
559            .create_endpoint("controlbus:route?action=status")
560            .unwrap();
561        let producer = endpoint.create_producer(&ctx).unwrap();
562
563        let exchange = Exchange::new(Message::default());
564        let result = producer.oneshot(exchange).await;
565        assert!(result.is_err());
566        let err = result.unwrap_err().to_string();
567        assert!(
568            err.contains("routeId required"),
569            "Error should mention routeId: {}",
570            err
571        );
572    }
573
574    #[tokio::test]
575    async fn test_producer_error_route_not_found() {
576        let ctx = test_producer_ctx(); // No routes
577        let comp = ControlBusComponent::new();
578        let endpoint = comp
579            .create_endpoint("controlbus:route?routeId=nonexistent&action=status")
580            .unwrap();
581        let producer = endpoint.create_producer(&ctx).unwrap();
582
583        let exchange = Exchange::new(Message::default());
584        let result = producer.oneshot(exchange).await;
585        assert!(result.is_err());
586        let err = result.unwrap_err().to_string();
587        assert!(
588            err.contains("not found"),
589            "Error should mention not found: {}",
590            err
591        );
592    }
593
594    #[test]
595    fn test_endpoint_parses_suspend_action() {
596        let comp = ControlBusComponent::new();
597        let endpoint = comp
598            .create_endpoint("controlbus:route?routeId=foo&action=suspend")
599            .unwrap();
600        assert_eq!(
601            endpoint.uri(),
602            "controlbus:route?routeId=foo&action=suspend"
603        );
604    }
605
606    #[test]
607    fn test_endpoint_parses_resume_action() {
608        let comp = ControlBusComponent::new();
609        let endpoint = comp
610            .create_endpoint("controlbus:route?routeId=foo&action=resume")
611            .unwrap();
612        assert_eq!(endpoint.uri(), "controlbus:route?routeId=foo&action=resume");
613    }
614
615    #[test]
616    fn test_endpoint_parses_restart_action() {
617        let comp = ControlBusComponent::new();
618        let endpoint = comp
619            .create_endpoint("controlbus:route?routeId=foo&action=restart")
620            .unwrap();
621        assert_eq!(
622            endpoint.uri(),
623            "controlbus:route?routeId=foo&action=restart"
624        );
625    }
626
627    #[test]
628    fn test_endpoint_rejects_unknown_command() {
629        let comp = ControlBusComponent::new();
630        let result = comp.create_endpoint("controlbus:unknown?action=start");
631        assert!(result.is_err(), "Should error for unknown command");
632    }
633}