Skip to main content

camel_component_controlbus/
lib.rs

1//! ControlBus component for managing route lifecycle.
2//!
3//! This component allows routes to control other routes in the Camel context
4//! using the ControlBus EIP pattern. It provides operations like start, stop,
5//! suspend, resume, and restart for routes.
6//!
7//! # URI Format
8//!
9//! `controlbus:route?routeId=my-route&action=start`
10//!
11//! # Parameters
12//!
13//! - `routeId`: The ID of the route to operate on (optional, can come from exchange header)
14//! - `action`: The action to perform: `start`, `stop`, `suspend`, `resume`, `restart`, `status`
15//!
16//! # Example
17//!
18//! ```ignore
19//! // Start a route
20//! from("timer:start?period=60000")
21//!     .to("controlbus:route?routeId=my-route&action=start");
22//!
23//! // Get route status
24//! from("direct:getStatus")
25//!     .to("controlbus:route?routeId=my-route&action=status")
26//!     .log("Status: ${body}");
27//! ```
28
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33
34#[cfg(test)]
35use async_trait::async_trait;
36#[cfg(test)]
37use tokio::sync::Mutex;
38use tower::Service;
39use tracing::debug;
40
41#[cfg(test)]
42use camel_api::RouteStatus;
43use camel_api::{
44    Body, BoxProcessor, CamelError, Exchange, RouteAction, RuntimeCommand, RuntimeQuery,
45    RuntimeQueryResult,
46};
47use camel_component::{Component, Consumer, Endpoint, ProducerContext};
48use camel_endpoint::parse_uri;
49
50// ---------------------------------------------------------------------------
51// ControlBusComponent
52// ---------------------------------------------------------------------------
53
54/// The ControlBus component for managing route lifecycle.
55pub struct ControlBusComponent;
56
57impl ControlBusComponent {
58    /// Create a new ControlBus component.
59    pub fn new() -> Self {
60        Self
61    }
62}
63
64impl Default for ControlBusComponent {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl Component for ControlBusComponent {
71    fn scheme(&self) -> &str {
72        "controlbus"
73    }
74
75    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
76        let parts = parse_uri(uri)?;
77
78        if parts.scheme != "controlbus" {
79            return Err(CamelError::InvalidUri(format!(
80                "expected scheme 'controlbus', got '{}'",
81                parts.scheme
82            )));
83        }
84
85        // Parse command (path portion after controlbus:)
86        let command = parts.path.clone();
87
88        // Validate command - only "route" is supported
89        if command != "route" {
90            return Err(CamelError::EndpointCreationFailed(format!(
91                "controlbus: unknown command '{}', only 'route' is supported",
92                command
93            )));
94        }
95
96        // Parse routeId parameter
97        let route_id = parts.params.get("routeId").cloned();
98
99        // Parse action parameter
100        let action = if let Some(action_str) = parts.params.get("action") {
101            Some(parse_action(action_str)?)
102        } else {
103            None
104        };
105
106        // Validate: for "route" command, action is required
107        if command == "route" && action.is_none() {
108            return Err(CamelError::EndpointCreationFailed(
109                "controlbus: 'action' parameter is required for route command".to_string(),
110            ));
111        }
112
113        Ok(Box::new(ControlBusEndpoint {
114            uri: uri.to_string(),
115            route_id,
116            action,
117        }))
118    }
119}
120
121/// Parse an action string into a RouteAction.
122fn parse_action(s: &str) -> Result<RouteAction, CamelError> {
123    match s.to_lowercase().as_str() {
124        "start" => Ok(RouteAction::Start),
125        "stop" => Ok(RouteAction::Stop),
126        "suspend" => Ok(RouteAction::Suspend),
127        "resume" => Ok(RouteAction::Resume),
128        "restart" => Ok(RouteAction::Restart),
129        "status" => Ok(RouteAction::Status),
130        _ => Err(CamelError::EndpointCreationFailed(format!(
131            "controlbus: unknown action '{}'",
132            s
133        ))),
134    }
135}
136
137// ---------------------------------------------------------------------------
138// ControlBusEndpoint
139// ---------------------------------------------------------------------------
140
141/// Endpoint for the ControlBus component.
142struct ControlBusEndpoint {
143    uri: String,
144    route_id: Option<String>,
145    action: Option<RouteAction>,
146}
147
148impl Endpoint for ControlBusEndpoint {
149    fn uri(&self) -> &str {
150        &self.uri
151    }
152
153    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
154        Err(CamelError::EndpointCreationFailed(
155            "controlbus endpoint does not support consumers".to_string(),
156        ))
157    }
158
159    fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
160        let action = self.action.clone().ok_or_else(|| {
161            CamelError::EndpointCreationFailed(
162                "controlbus: action is required to create producer".to_string(),
163            )
164        })?;
165        let runtime = ctx.runtime().cloned().ok_or_else(|| {
166            CamelError::EndpointCreationFailed(
167                "controlbus: runtime handle is required in ProducerContext".to_string(),
168            )
169        })?;
170
171        Ok(BoxProcessor::new(ControlBusProducer {
172            route_id: self.route_id.clone(),
173            action,
174            runtime,
175        }))
176    }
177}
178
179// ---------------------------------------------------------------------------
180// ControlBusProducer
181// ---------------------------------------------------------------------------
182
183/// Producer that executes control bus actions on routes.
184#[derive(Clone)]
185struct ControlBusProducer {
186    /// Route ID from URI params (may be None, in which case header is used).
187    route_id: Option<String>,
188    /// Action to perform on the route.
189    action: RouteAction,
190    /// Runtime command/query handle.
191    runtime: Arc<dyn camel_api::RuntimeHandle>,
192}
193
194impl Service<Exchange> for ControlBusProducer {
195    type Response = Exchange;
196    type Error = CamelError;
197    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
198
199    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
200        Poll::Ready(Ok(()))
201    }
202
203    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
204        // Get route_id: prefer field, fallback to header "CamelRouteId"
205        let route_id = self.route_id.clone().or_else(|| {
206            exchange
207                .input
208                .header("CamelRouteId")
209                .and_then(|v| v.as_str().map(|s| s.to_string()))
210        });
211
212        // If no route_id → error
213        let route_id = match route_id {
214            Some(id) => id,
215            None => {
216                return Box::pin(async {
217                    Err(CamelError::ProcessorError(
218                        "controlbus: routeId required (set via URI param or CamelRouteId header)"
219                            .into(),
220                    ))
221                });
222            }
223        };
224
225        let action = self.action.clone();
226        let runtime = self.runtime.clone();
227        let command_scope = format!("controlbus:{route_id}:{}", exchange.correlation_id());
228
229        Box::pin(async move {
230            debug!(
231                route_id = %route_id,
232                action = ?action,
233                "ControlBus executing action"
234            );
235
236            match execute_runtime_action(runtime.as_ref(), &route_id, &action, &command_scope)
237                .await?
238            {
239                Some(status) => {
240                    exchange.input.body = Body::Text(status);
241                    Ok(exchange)
242                }
243                None => {
244                    exchange.input.body = Body::Empty;
245                    Ok(exchange)
246                }
247            }
248        })
249    }
250}
251
252async fn execute_runtime_action(
253    runtime: &dyn camel_api::RuntimeHandle,
254    route_id: &str,
255    action: &RouteAction,
256    command_scope: &str,
257) -> Result<Option<String>, CamelError> {
258    match action {
259        RouteAction::Start => {
260            runtime
261                .execute(RuntimeCommand::StartRoute {
262                    route_id: route_id.to_string(),
263                    command_id: command_id(command_scope, "start"),
264                    causation_id: None,
265                })
266                .await?;
267            Ok(None)
268        }
269        RouteAction::Stop => {
270            runtime
271                .execute(RuntimeCommand::StopRoute {
272                    route_id: route_id.to_string(),
273                    command_id: command_id(command_scope, "stop"),
274                    causation_id: None,
275                })
276                .await?;
277            Ok(None)
278        }
279        RouteAction::Suspend => {
280            runtime
281                .execute(RuntimeCommand::SuspendRoute {
282                    route_id: route_id.to_string(),
283                    command_id: command_id(command_scope, "suspend"),
284                    causation_id: None,
285                })
286                .await?;
287            Ok(None)
288        }
289        RouteAction::Resume => {
290            runtime
291                .execute(RuntimeCommand::ResumeRoute {
292                    route_id: route_id.to_string(),
293                    command_id: command_id(command_scope, "resume"),
294                    causation_id: None,
295                })
296                .await?;
297            Ok(None)
298        }
299        RouteAction::Restart => {
300            runtime
301                .execute(RuntimeCommand::ReloadRoute {
302                    route_id: route_id.to_string(),
303                    command_id: command_id(command_scope, "restart"),
304                    causation_id: None,
305                })
306                .await?;
307            Ok(None)
308        }
309        RouteAction::Status => match runtime
310            .ask(RuntimeQuery::GetRouteStatus {
311                route_id: route_id.to_string(),
312            })
313            .await?
314        {
315            RuntimeQueryResult::RouteStatus { status, .. } => Ok(Some(status)),
316            _ => Err(CamelError::ProcessorError(
317                "controlbus: runtime returned unexpected response for route status".to_string(),
318            )),
319        },
320    }
321}
322
323fn command_id(route_id: &str, operation: &str) -> String {
324    format!("controlbus:{route_id}:{operation}")
325}
326
327// ---------------------------------------------------------------------------
328// Tests
329// ---------------------------------------------------------------------------
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334    use camel_api::Message;
335    use tower::ServiceExt;
336
337    struct MockRuntime {
338        statuses: std::collections::HashMap<String, String>,
339        commands: Arc<Mutex<Vec<String>>>,
340    }
341
342    impl MockRuntime {
343        fn new() -> Self {
344            Self {
345                statuses: std::collections::HashMap::new(),
346                commands: Arc::new(Mutex::new(Vec::new())),
347            }
348        }
349
350        fn with_status(mut self, route_id: &str, status: &str) -> Self {
351            self.statuses
352                .insert(route_id.to_string(), status.to_string());
353            self
354        }
355
356        fn commands(&self) -> Arc<Mutex<Vec<String>>> {
357            Arc::clone(&self.commands)
358        }
359    }
360
361    #[async_trait]
362    impl camel_api::RuntimeCommandBus for MockRuntime {
363        async fn execute(
364            &self,
365            cmd: camel_api::RuntimeCommand,
366        ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
367            let marker = match cmd {
368                camel_api::RuntimeCommand::RegisterRoute { .. } => "register".to_string(),
369                camel_api::RuntimeCommand::StartRoute { route_id, .. } => {
370                    format!("start:{route_id}")
371                }
372                camel_api::RuntimeCommand::StopRoute { route_id, .. } => {
373                    format!("stop:{route_id}")
374                }
375                camel_api::RuntimeCommand::SuspendRoute { route_id, .. } => {
376                    format!("suspend:{route_id}")
377                }
378                camel_api::RuntimeCommand::ResumeRoute { route_id, .. } => {
379                    format!("resume:{route_id}")
380                }
381                camel_api::RuntimeCommand::ReloadRoute { route_id, .. } => {
382                    format!("reload:{route_id}")
383                }
384                camel_api::RuntimeCommand::FailRoute { route_id, .. } => format!("fail:{route_id}"),
385                camel_api::RuntimeCommand::RemoveRoute { route_id, .. } => {
386                    format!("remove:{route_id}")
387                }
388            };
389            self.commands.lock().await.push(marker);
390            Ok(camel_api::RuntimeCommandResult::Accepted)
391        }
392    }
393
394    #[async_trait]
395    impl camel_api::RuntimeQueryBus for MockRuntime {
396        async fn ask(
397            &self,
398            query: camel_api::RuntimeQuery,
399        ) -> Result<camel_api::RuntimeQueryResult, CamelError> {
400            match query {
401                camel_api::RuntimeQuery::GetRouteStatus { route_id } => {
402                    let status = self.statuses.get(&route_id).ok_or_else(|| {
403                        CamelError::ProcessorError(format!(
404                            "runtime: route '{}' not found",
405                            route_id
406                        ))
407                    })?;
408                    Ok(camel_api::RuntimeQueryResult::RouteStatus {
409                        route_id,
410                        status: status.clone(),
411                    })
412                }
413                _ => Err(CamelError::ProcessorError(
414                    "runtime: unsupported query in test".to_string(),
415                )),
416            }
417        }
418    }
419
420    fn test_producer_ctx() -> ProducerContext {
421        ProducerContext::new().with_runtime(Arc::new(MockRuntime::new()))
422    }
423
424    fn test_producer_ctx_with_route(id: &str, status: RouteStatus) -> ProducerContext {
425        let runtime_status = runtime_status_for(&status);
426        ProducerContext::new().with_runtime(Arc::new(
427            MockRuntime::new().with_status(id, &runtime_status),
428        ))
429    }
430
431    fn test_producer_ctx_with_runtime_status(route_id: &str, status: &str) -> ProducerContext {
432        ProducerContext::new()
433            .with_runtime(Arc::new(MockRuntime::new().with_status(route_id, status)))
434    }
435
436    fn test_producer_ctx_with_empty_runtime() -> ProducerContext {
437        ProducerContext::new().with_runtime(Arc::new(MockRuntime::new()))
438    }
439
440    fn runtime_status_for(status: &RouteStatus) -> String {
441        match status {
442            RouteStatus::Stopped => "Stopped".to_string(),
443            RouteStatus::Starting => "Starting".to_string(),
444            RouteStatus::Started => "Started".to_string(),
445            RouteStatus::Stopping => "Stopping".to_string(),
446            RouteStatus::Suspended => "Suspended".to_string(),
447            RouteStatus::Failed(msg) => format!("Failed: {msg}"),
448        }
449    }
450
451    #[test]
452    fn test_endpoint_requires_action_for_route_command() {
453        let comp = ControlBusComponent::new();
454        let result = comp.create_endpoint("controlbus:route?routeId=foo");
455        assert!(result.is_err(), "Should error when action is missing");
456    }
457
458    #[test]
459    fn test_endpoint_rejects_unknown_action() {
460        let comp = ControlBusComponent::new();
461        let result = comp.create_endpoint("controlbus:route?routeId=foo&action=banana");
462        assert!(result.is_err(), "Should error for unknown action");
463    }
464
465    #[test]
466    fn test_endpoint_parses_valid_uri() {
467        let comp = ControlBusComponent::new();
468        let endpoint = comp
469            .create_endpoint("controlbus:route?routeId=foo&action=start")
470            .unwrap();
471        assert_eq!(endpoint.uri(), "controlbus:route?routeId=foo&action=start");
472    }
473
474    #[test]
475    fn test_endpoint_returns_no_consumer() {
476        let comp = ControlBusComponent::new();
477        let endpoint = comp
478            .create_endpoint("controlbus:route?routeId=foo&action=stop")
479            .unwrap();
480        assert!(endpoint.create_consumer().is_err());
481    }
482
483    #[test]
484    fn test_endpoint_creates_producer() {
485        let ctx = test_producer_ctx();
486        let comp = ControlBusComponent::new();
487        let endpoint = comp
488            .create_endpoint("controlbus:route?routeId=foo&action=start")
489            .unwrap();
490        assert!(endpoint.create_producer(&ctx).is_ok());
491    }
492
493    #[test]
494    fn test_component_scheme() {
495        let comp = ControlBusComponent::new();
496        assert_eq!(comp.scheme(), "controlbus");
497    }
498
499    #[tokio::test]
500    async fn test_producer_start_route() {
501        let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Stopped);
502        let comp = ControlBusComponent::new();
503        let endpoint = comp
504            .create_endpoint("controlbus:route?routeId=my-route&action=start")
505            .unwrap();
506        let producer = endpoint.create_producer(&ctx).unwrap();
507
508        let exchange = Exchange::new(Message::default());
509        let result = producer.oneshot(exchange).await.unwrap();
510        assert!(matches!(result.input.body, Body::Empty));
511    }
512
513    #[tokio::test]
514    async fn test_producer_stop_route() {
515        let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Started);
516        let comp = ControlBusComponent::new();
517        let endpoint = comp
518            .create_endpoint("controlbus:route?routeId=my-route&action=stop")
519            .unwrap();
520        let producer = endpoint.create_producer(&ctx).unwrap();
521
522        let exchange = Exchange::new(Message::default());
523        let result = producer.oneshot(exchange).await.unwrap();
524        assert!(matches!(result.input.body, Body::Empty));
525    }
526
527    #[tokio::test]
528    async fn test_producer_restart_maps_to_runtime_reload_command() {
529        let runtime = Arc::new(MockRuntime::new().with_status("my-route", "Started"));
530        let commands = runtime.commands();
531        let ctx = ProducerContext::new().with_runtime(runtime);
532        let comp = ControlBusComponent::new();
533        let endpoint = comp
534            .create_endpoint("controlbus:route?routeId=my-route&action=restart")
535            .unwrap();
536        let producer = endpoint.create_producer(&ctx).unwrap();
537
538        let exchange = Exchange::new(Message::default());
539        let result = producer.oneshot(exchange).await.unwrap();
540        assert!(matches!(result.input.body, Body::Empty));
541
542        let recorded = commands.lock().await.clone();
543        assert_eq!(recorded, vec!["reload:my-route".to_string()]);
544    }
545
546    #[tokio::test]
547    async fn test_producer_status_route() {
548        let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Started);
549        let comp = ControlBusComponent::new();
550        let endpoint = comp
551            .create_endpoint("controlbus:route?routeId=my-route&action=status")
552            .unwrap();
553        let producer = endpoint.create_producer(&ctx).unwrap();
554
555        let exchange = Exchange::new(Message::default());
556        let result = producer.oneshot(exchange).await.unwrap();
557        assert!(matches!(result.input.body, Body::Text(_)));
558        if let Body::Text(status) = &result.input.body {
559            assert_eq!(status, "Started");
560        }
561    }
562
563    #[tokio::test]
564    async fn test_producer_status_failed_route() {
565        let ctx =
566            test_producer_ctx_with_route("my-route", RouteStatus::Failed("error msg".to_string()));
567        let comp = ControlBusComponent::new();
568        let endpoint = comp
569            .create_endpoint("controlbus:route?routeId=my-route&action=status")
570            .unwrap();
571        let producer = endpoint.create_producer(&ctx).unwrap();
572
573        let exchange = Exchange::new(Message::default());
574        let result = producer.oneshot(exchange).await.unwrap();
575        assert!(matches!(result.input.body, Body::Text(_)));
576        if let Body::Text(status) = &result.input.body {
577            assert_eq!(status, "Failed: error msg");
578        }
579    }
580
581    #[tokio::test]
582    async fn test_producer_status_uses_runtime_when_available() {
583        let ctx = test_producer_ctx_with_runtime_status("runtime-route", "Started");
584        let comp = ControlBusComponent::new();
585        let endpoint = comp
586            .create_endpoint("controlbus:route?routeId=runtime-route&action=status")
587            .unwrap();
588        let producer = endpoint.create_producer(&ctx).unwrap();
589
590        let exchange = Exchange::new(Message::default());
591        let result = producer.oneshot(exchange).await.unwrap();
592        assert!(matches!(result.input.body, Body::Text(_)));
593        if let Body::Text(status) = &result.input.body {
594            assert_eq!(status, "Started");
595        }
596    }
597
598    #[tokio::test]
599    async fn test_producer_status_errors_when_runtime_route_is_missing() {
600        let ctx = test_producer_ctx_with_empty_runtime();
601        let comp = ControlBusComponent::new();
602        let endpoint = comp
603            .create_endpoint("controlbus:route?routeId=my-route&action=status")
604            .unwrap();
605        let producer = endpoint.create_producer(&ctx).unwrap();
606
607        let exchange = Exchange::new(Message::default());
608        let err = producer.oneshot(exchange).await.unwrap_err().to_string();
609        assert!(
610            err.contains("not found"),
611            "runtime miss should not fallback to controller: {err}"
612        );
613    }
614
615    #[tokio::test]
616    async fn test_producer_uses_header_route_id() {
617        let ctx = test_producer_ctx_with_route("from-header", RouteStatus::Started);
618        let comp = ControlBusComponent::new();
619        // No routeId in URI
620        let endpoint = comp
621            .create_endpoint("controlbus:route?action=status")
622            .unwrap();
623        let producer = endpoint.create_producer(&ctx).unwrap();
624
625        let mut exchange = Exchange::new(Message::default());
626        exchange.input.set_header(
627            "CamelRouteId",
628            serde_json::Value::String("from-header".to_string()),
629        );
630
631        let result = producer.oneshot(exchange).await.unwrap();
632        assert!(matches!(result.input.body, Body::Text(_)));
633        if let Body::Text(status) = &result.input.body {
634            assert_eq!(status, "Started");
635        }
636    }
637
638    #[tokio::test]
639    async fn test_producer_uri_route_id_overrides_header() {
640        let ctx = test_producer_ctx_with_route("from-uri", RouteStatus::Started);
641        // Use ctx which has "from-uri" route
642        let comp = ControlBusComponent::new();
643        let endpoint = comp
644            .create_endpoint("controlbus:route?routeId=from-uri&action=status")
645            .unwrap();
646        let producer = endpoint.create_producer(&ctx).unwrap();
647
648        let mut exchange = Exchange::new(Message::default());
649        // Header has different route ID, but URI param should take precedence
650        exchange.input.set_header(
651            "CamelRouteId",
652            serde_json::Value::String("from-header".to_string()),
653        );
654
655        let result = producer.oneshot(exchange).await.unwrap();
656        if let Body::Text(status) = &result.input.body {
657            assert_eq!(status, "Started", "Should use URI routeId, not header");
658        }
659    }
660
661    #[tokio::test]
662    async fn test_producer_error_no_route_id() {
663        let ctx = test_producer_ctx();
664        let comp = ControlBusComponent::new();
665        let endpoint = comp
666            .create_endpoint("controlbus:route?action=status")
667            .unwrap();
668        let producer = endpoint.create_producer(&ctx).unwrap();
669
670        let exchange = Exchange::new(Message::default());
671        let result = producer.oneshot(exchange).await;
672        assert!(result.is_err());
673        let err = result.unwrap_err().to_string();
674        assert!(
675            err.contains("routeId required"),
676            "Error should mention routeId: {}",
677            err
678        );
679    }
680
681    #[tokio::test]
682    async fn test_producer_error_route_not_found() {
683        let ctx = test_producer_ctx(); // No routes
684        let comp = ControlBusComponent::new();
685        let endpoint = comp
686            .create_endpoint("controlbus:route?routeId=nonexistent&action=status")
687            .unwrap();
688        let producer = endpoint.create_producer(&ctx).unwrap();
689
690        let exchange = Exchange::new(Message::default());
691        let result = producer.oneshot(exchange).await;
692        assert!(result.is_err());
693        let err = result.unwrap_err().to_string();
694        assert!(
695            err.contains("not found"),
696            "Error should mention not found: {}",
697            err
698        );
699    }
700
701    #[test]
702    fn test_endpoint_parses_suspend_action() {
703        let comp = ControlBusComponent::new();
704        let endpoint = comp
705            .create_endpoint("controlbus:route?routeId=foo&action=suspend")
706            .unwrap();
707        assert_eq!(
708            endpoint.uri(),
709            "controlbus:route?routeId=foo&action=suspend"
710        );
711    }
712
713    #[test]
714    fn test_endpoint_parses_resume_action() {
715        let comp = ControlBusComponent::new();
716        let endpoint = comp
717            .create_endpoint("controlbus:route?routeId=foo&action=resume")
718            .unwrap();
719        assert_eq!(endpoint.uri(), "controlbus:route?routeId=foo&action=resume");
720    }
721
722    #[test]
723    fn test_endpoint_parses_restart_action() {
724        let comp = ControlBusComponent::new();
725        let endpoint = comp
726            .create_endpoint("controlbus:route?routeId=foo&action=restart")
727            .unwrap();
728        assert_eq!(
729            endpoint.uri(),
730            "controlbus:route?routeId=foo&action=restart"
731        );
732    }
733
734    #[test]
735    fn test_endpoint_rejects_unknown_command() {
736        let comp = ControlBusComponent::new();
737        let result = comp.create_endpoint("controlbus:unknown?action=start");
738        assert!(result.is_err(), "Should error for unknown command");
739    }
740}