1use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33
34#[cfg(test)]
35use async_trait::async_trait;
36#[cfg(test)]
37use tokio::sync::Mutex;
38use tower::Service;
39use tracing::debug;
40
41#[cfg(test)]
42use camel_api::RouteStatus;
43use camel_api::{
44 Body, BoxProcessor, CamelError, Exchange, RouteAction, RuntimeCommand, RuntimeQuery,
45 RuntimeQueryResult,
46};
47use camel_component::{Component, Consumer, Endpoint, ProducerContext};
48use camel_endpoint::parse_uri;
49
50pub struct ControlBusComponent;
56
57impl ControlBusComponent {
58 pub fn new() -> Self {
60 Self
61 }
62}
63
64impl Default for ControlBusComponent {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl Component for ControlBusComponent {
71 fn scheme(&self) -> &str {
72 "controlbus"
73 }
74
75 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
76 let parts = parse_uri(uri)?;
77
78 if parts.scheme != "controlbus" {
79 return Err(CamelError::InvalidUri(format!(
80 "expected scheme 'controlbus', got '{}'",
81 parts.scheme
82 )));
83 }
84
85 let command = parts.path.clone();
87
88 if command != "route" {
90 return Err(CamelError::EndpointCreationFailed(format!(
91 "controlbus: unknown command '{}', only 'route' is supported",
92 command
93 )));
94 }
95
96 let route_id = parts.params.get("routeId").cloned();
98
99 let action = if let Some(action_str) = parts.params.get("action") {
101 Some(parse_action(action_str)?)
102 } else {
103 None
104 };
105
106 if command == "route" && action.is_none() {
108 return Err(CamelError::EndpointCreationFailed(
109 "controlbus: 'action' parameter is required for route command".to_string(),
110 ));
111 }
112
113 Ok(Box::new(ControlBusEndpoint {
114 uri: uri.to_string(),
115 route_id,
116 action,
117 }))
118 }
119}
120
121fn parse_action(s: &str) -> Result<RouteAction, CamelError> {
123 match s.to_lowercase().as_str() {
124 "start" => Ok(RouteAction::Start),
125 "stop" => Ok(RouteAction::Stop),
126 "suspend" => Ok(RouteAction::Suspend),
127 "resume" => Ok(RouteAction::Resume),
128 "restart" => Ok(RouteAction::Restart),
129 "status" => Ok(RouteAction::Status),
130 _ => Err(CamelError::EndpointCreationFailed(format!(
131 "controlbus: unknown action '{}'",
132 s
133 ))),
134 }
135}
136
137struct ControlBusEndpoint {
143 uri: String,
144 route_id: Option<String>,
145 action: Option<RouteAction>,
146}
147
148impl Endpoint for ControlBusEndpoint {
149 fn uri(&self) -> &str {
150 &self.uri
151 }
152
153 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
154 Err(CamelError::EndpointCreationFailed(
155 "controlbus endpoint does not support consumers".to_string(),
156 ))
157 }
158
159 fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
160 let action = self.action.clone().ok_or_else(|| {
161 CamelError::EndpointCreationFailed(
162 "controlbus: action is required to create producer".to_string(),
163 )
164 })?;
165 let runtime = ctx.runtime().cloned().ok_or_else(|| {
166 CamelError::EndpointCreationFailed(
167 "controlbus: runtime handle is required in ProducerContext".to_string(),
168 )
169 })?;
170
171 Ok(BoxProcessor::new(ControlBusProducer {
172 route_id: self.route_id.clone(),
173 action,
174 runtime,
175 }))
176 }
177}
178
179#[derive(Clone)]
185struct ControlBusProducer {
186 route_id: Option<String>,
188 action: RouteAction,
190 runtime: Arc<dyn camel_api::RuntimeHandle>,
192}
193
194impl Service<Exchange> for ControlBusProducer {
195 type Response = Exchange;
196 type Error = CamelError;
197 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
198
199 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
200 Poll::Ready(Ok(()))
201 }
202
203 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
204 let route_id = self.route_id.clone().or_else(|| {
206 exchange
207 .input
208 .header("CamelRouteId")
209 .and_then(|v| v.as_str().map(|s| s.to_string()))
210 });
211
212 let route_id = match route_id {
214 Some(id) => id,
215 None => {
216 return Box::pin(async {
217 Err(CamelError::ProcessorError(
218 "controlbus: routeId required (set via URI param or CamelRouteId header)"
219 .into(),
220 ))
221 });
222 }
223 };
224
225 let action = self.action.clone();
226 let runtime = self.runtime.clone();
227 let command_scope = format!("controlbus:{route_id}:{}", exchange.correlation_id());
228
229 Box::pin(async move {
230 debug!(
231 route_id = %route_id,
232 action = ?action,
233 "ControlBus executing action"
234 );
235
236 match execute_runtime_action(runtime.as_ref(), &route_id, &action, &command_scope)
237 .await?
238 {
239 Some(status) => {
240 exchange.input.body = Body::Text(status);
241 Ok(exchange)
242 }
243 None => {
244 exchange.input.body = Body::Empty;
245 Ok(exchange)
246 }
247 }
248 })
249 }
250}
251
252async fn execute_runtime_action(
253 runtime: &dyn camel_api::RuntimeHandle,
254 route_id: &str,
255 action: &RouteAction,
256 command_scope: &str,
257) -> Result<Option<String>, CamelError> {
258 match action {
259 RouteAction::Start => {
260 runtime
261 .execute(RuntimeCommand::StartRoute {
262 route_id: route_id.to_string(),
263 command_id: command_id(command_scope, "start"),
264 causation_id: None,
265 })
266 .await?;
267 Ok(None)
268 }
269 RouteAction::Stop => {
270 runtime
271 .execute(RuntimeCommand::StopRoute {
272 route_id: route_id.to_string(),
273 command_id: command_id(command_scope, "stop"),
274 causation_id: None,
275 })
276 .await?;
277 Ok(None)
278 }
279 RouteAction::Suspend => {
280 runtime
281 .execute(RuntimeCommand::SuspendRoute {
282 route_id: route_id.to_string(),
283 command_id: command_id(command_scope, "suspend"),
284 causation_id: None,
285 })
286 .await?;
287 Ok(None)
288 }
289 RouteAction::Resume => {
290 runtime
291 .execute(RuntimeCommand::ResumeRoute {
292 route_id: route_id.to_string(),
293 command_id: command_id(command_scope, "resume"),
294 causation_id: None,
295 })
296 .await?;
297 Ok(None)
298 }
299 RouteAction::Restart => {
300 runtime
301 .execute(RuntimeCommand::ReloadRoute {
302 route_id: route_id.to_string(),
303 command_id: command_id(command_scope, "restart"),
304 causation_id: None,
305 })
306 .await?;
307 Ok(None)
308 }
309 RouteAction::Status => match runtime
310 .ask(RuntimeQuery::GetRouteStatus {
311 route_id: route_id.to_string(),
312 })
313 .await?
314 {
315 RuntimeQueryResult::RouteStatus { status, .. } => Ok(Some(status)),
316 _ => Err(CamelError::ProcessorError(
317 "controlbus: runtime returned unexpected response for route status".to_string(),
318 )),
319 },
320 }
321}
322
323fn command_id(route_id: &str, operation: &str) -> String {
324 format!("controlbus:{route_id}:{operation}")
325}
326
327#[cfg(test)]
332mod tests {
333 use super::*;
334 use camel_api::Message;
335 use tower::ServiceExt;
336
337 struct MockRuntime {
338 statuses: std::collections::HashMap<String, String>,
339 commands: Arc<Mutex<Vec<String>>>,
340 }
341
342 impl MockRuntime {
343 fn new() -> Self {
344 Self {
345 statuses: std::collections::HashMap::new(),
346 commands: Arc::new(Mutex::new(Vec::new())),
347 }
348 }
349
350 fn with_status(mut self, route_id: &str, status: &str) -> Self {
351 self.statuses
352 .insert(route_id.to_string(), status.to_string());
353 self
354 }
355
356 fn commands(&self) -> Arc<Mutex<Vec<String>>> {
357 Arc::clone(&self.commands)
358 }
359 }
360
361 #[async_trait]
362 impl camel_api::RuntimeCommandBus for MockRuntime {
363 async fn execute(
364 &self,
365 cmd: camel_api::RuntimeCommand,
366 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
367 let marker = match cmd {
368 camel_api::RuntimeCommand::RegisterRoute { .. } => "register".to_string(),
369 camel_api::RuntimeCommand::StartRoute { route_id, .. } => {
370 format!("start:{route_id}")
371 }
372 camel_api::RuntimeCommand::StopRoute { route_id, .. } => {
373 format!("stop:{route_id}")
374 }
375 camel_api::RuntimeCommand::SuspendRoute { route_id, .. } => {
376 format!("suspend:{route_id}")
377 }
378 camel_api::RuntimeCommand::ResumeRoute { route_id, .. } => {
379 format!("resume:{route_id}")
380 }
381 camel_api::RuntimeCommand::ReloadRoute { route_id, .. } => {
382 format!("reload:{route_id}")
383 }
384 camel_api::RuntimeCommand::FailRoute { route_id, .. } => format!("fail:{route_id}"),
385 camel_api::RuntimeCommand::RemoveRoute { route_id, .. } => {
386 format!("remove:{route_id}")
387 }
388 };
389 self.commands.lock().await.push(marker);
390 Ok(camel_api::RuntimeCommandResult::Accepted)
391 }
392 }
393
394 #[async_trait]
395 impl camel_api::RuntimeQueryBus for MockRuntime {
396 async fn ask(
397 &self,
398 query: camel_api::RuntimeQuery,
399 ) -> Result<camel_api::RuntimeQueryResult, CamelError> {
400 match query {
401 camel_api::RuntimeQuery::GetRouteStatus { route_id } => {
402 let status = self.statuses.get(&route_id).ok_or_else(|| {
403 CamelError::ProcessorError(format!(
404 "runtime: route '{}' not found",
405 route_id
406 ))
407 })?;
408 Ok(camel_api::RuntimeQueryResult::RouteStatus {
409 route_id,
410 status: status.clone(),
411 })
412 }
413 _ => Err(CamelError::ProcessorError(
414 "runtime: unsupported query in test".to_string(),
415 )),
416 }
417 }
418 }
419
420 fn test_producer_ctx() -> ProducerContext {
421 ProducerContext::new().with_runtime(Arc::new(MockRuntime::new()))
422 }
423
424 fn test_producer_ctx_with_route(id: &str, status: RouteStatus) -> ProducerContext {
425 let runtime_status = runtime_status_for(&status);
426 ProducerContext::new().with_runtime(Arc::new(
427 MockRuntime::new().with_status(id, &runtime_status),
428 ))
429 }
430
431 fn test_producer_ctx_with_runtime_status(route_id: &str, status: &str) -> ProducerContext {
432 ProducerContext::new()
433 .with_runtime(Arc::new(MockRuntime::new().with_status(route_id, status)))
434 }
435
436 fn test_producer_ctx_with_empty_runtime() -> ProducerContext {
437 ProducerContext::new().with_runtime(Arc::new(MockRuntime::new()))
438 }
439
440 fn runtime_status_for(status: &RouteStatus) -> String {
441 match status {
442 RouteStatus::Stopped => "Stopped".to_string(),
443 RouteStatus::Starting => "Starting".to_string(),
444 RouteStatus::Started => "Started".to_string(),
445 RouteStatus::Stopping => "Stopping".to_string(),
446 RouteStatus::Suspended => "Suspended".to_string(),
447 RouteStatus::Failed(msg) => format!("Failed: {msg}"),
448 }
449 }
450
451 #[test]
452 fn test_endpoint_requires_action_for_route_command() {
453 let comp = ControlBusComponent::new();
454 let result = comp.create_endpoint("controlbus:route?routeId=foo");
455 assert!(result.is_err(), "Should error when action is missing");
456 }
457
458 #[test]
459 fn test_endpoint_rejects_unknown_action() {
460 let comp = ControlBusComponent::new();
461 let result = comp.create_endpoint("controlbus:route?routeId=foo&action=banana");
462 assert!(result.is_err(), "Should error for unknown action");
463 }
464
465 #[test]
466 fn test_endpoint_parses_valid_uri() {
467 let comp = ControlBusComponent::new();
468 let endpoint = comp
469 .create_endpoint("controlbus:route?routeId=foo&action=start")
470 .unwrap();
471 assert_eq!(endpoint.uri(), "controlbus:route?routeId=foo&action=start");
472 }
473
474 #[test]
475 fn test_endpoint_returns_no_consumer() {
476 let comp = ControlBusComponent::new();
477 let endpoint = comp
478 .create_endpoint("controlbus:route?routeId=foo&action=stop")
479 .unwrap();
480 assert!(endpoint.create_consumer().is_err());
481 }
482
483 #[test]
484 fn test_endpoint_creates_producer() {
485 let ctx = test_producer_ctx();
486 let comp = ControlBusComponent::new();
487 let endpoint = comp
488 .create_endpoint("controlbus:route?routeId=foo&action=start")
489 .unwrap();
490 assert!(endpoint.create_producer(&ctx).is_ok());
491 }
492
493 #[test]
494 fn test_component_scheme() {
495 let comp = ControlBusComponent::new();
496 assert_eq!(comp.scheme(), "controlbus");
497 }
498
499 #[tokio::test]
500 async fn test_producer_start_route() {
501 let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Stopped);
502 let comp = ControlBusComponent::new();
503 let endpoint = comp
504 .create_endpoint("controlbus:route?routeId=my-route&action=start")
505 .unwrap();
506 let producer = endpoint.create_producer(&ctx).unwrap();
507
508 let exchange = Exchange::new(Message::default());
509 let result = producer.oneshot(exchange).await.unwrap();
510 assert!(matches!(result.input.body, Body::Empty));
511 }
512
513 #[tokio::test]
514 async fn test_producer_stop_route() {
515 let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Started);
516 let comp = ControlBusComponent::new();
517 let endpoint = comp
518 .create_endpoint("controlbus:route?routeId=my-route&action=stop")
519 .unwrap();
520 let producer = endpoint.create_producer(&ctx).unwrap();
521
522 let exchange = Exchange::new(Message::default());
523 let result = producer.oneshot(exchange).await.unwrap();
524 assert!(matches!(result.input.body, Body::Empty));
525 }
526
527 #[tokio::test]
528 async fn test_producer_restart_maps_to_runtime_reload_command() {
529 let runtime = Arc::new(MockRuntime::new().with_status("my-route", "Started"));
530 let commands = runtime.commands();
531 let ctx = ProducerContext::new().with_runtime(runtime);
532 let comp = ControlBusComponent::new();
533 let endpoint = comp
534 .create_endpoint("controlbus:route?routeId=my-route&action=restart")
535 .unwrap();
536 let producer = endpoint.create_producer(&ctx).unwrap();
537
538 let exchange = Exchange::new(Message::default());
539 let result = producer.oneshot(exchange).await.unwrap();
540 assert!(matches!(result.input.body, Body::Empty));
541
542 let recorded = commands.lock().await.clone();
543 assert_eq!(recorded, vec!["reload:my-route".to_string()]);
544 }
545
546 #[tokio::test]
547 async fn test_producer_status_route() {
548 let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Started);
549 let comp = ControlBusComponent::new();
550 let endpoint = comp
551 .create_endpoint("controlbus:route?routeId=my-route&action=status")
552 .unwrap();
553 let producer = endpoint.create_producer(&ctx).unwrap();
554
555 let exchange = Exchange::new(Message::default());
556 let result = producer.oneshot(exchange).await.unwrap();
557 assert!(matches!(result.input.body, Body::Text(_)));
558 if let Body::Text(status) = &result.input.body {
559 assert_eq!(status, "Started");
560 }
561 }
562
563 #[tokio::test]
564 async fn test_producer_status_failed_route() {
565 let ctx =
566 test_producer_ctx_with_route("my-route", RouteStatus::Failed("error msg".to_string()));
567 let comp = ControlBusComponent::new();
568 let endpoint = comp
569 .create_endpoint("controlbus:route?routeId=my-route&action=status")
570 .unwrap();
571 let producer = endpoint.create_producer(&ctx).unwrap();
572
573 let exchange = Exchange::new(Message::default());
574 let result = producer.oneshot(exchange).await.unwrap();
575 assert!(matches!(result.input.body, Body::Text(_)));
576 if let Body::Text(status) = &result.input.body {
577 assert_eq!(status, "Failed: error msg");
578 }
579 }
580
581 #[tokio::test]
582 async fn test_producer_status_uses_runtime_when_available() {
583 let ctx = test_producer_ctx_with_runtime_status("runtime-route", "Started");
584 let comp = ControlBusComponent::new();
585 let endpoint = comp
586 .create_endpoint("controlbus:route?routeId=runtime-route&action=status")
587 .unwrap();
588 let producer = endpoint.create_producer(&ctx).unwrap();
589
590 let exchange = Exchange::new(Message::default());
591 let result = producer.oneshot(exchange).await.unwrap();
592 assert!(matches!(result.input.body, Body::Text(_)));
593 if let Body::Text(status) = &result.input.body {
594 assert_eq!(status, "Started");
595 }
596 }
597
598 #[tokio::test]
599 async fn test_producer_status_errors_when_runtime_route_is_missing() {
600 let ctx = test_producer_ctx_with_empty_runtime();
601 let comp = ControlBusComponent::new();
602 let endpoint = comp
603 .create_endpoint("controlbus:route?routeId=my-route&action=status")
604 .unwrap();
605 let producer = endpoint.create_producer(&ctx).unwrap();
606
607 let exchange = Exchange::new(Message::default());
608 let err = producer.oneshot(exchange).await.unwrap_err().to_string();
609 assert!(
610 err.contains("not found"),
611 "runtime miss should not fallback to controller: {err}"
612 );
613 }
614
615 #[tokio::test]
616 async fn test_producer_uses_header_route_id() {
617 let ctx = test_producer_ctx_with_route("from-header", RouteStatus::Started);
618 let comp = ControlBusComponent::new();
619 let endpoint = comp
621 .create_endpoint("controlbus:route?action=status")
622 .unwrap();
623 let producer = endpoint.create_producer(&ctx).unwrap();
624
625 let mut exchange = Exchange::new(Message::default());
626 exchange.input.set_header(
627 "CamelRouteId",
628 serde_json::Value::String("from-header".to_string()),
629 );
630
631 let result = producer.oneshot(exchange).await.unwrap();
632 assert!(matches!(result.input.body, Body::Text(_)));
633 if let Body::Text(status) = &result.input.body {
634 assert_eq!(status, "Started");
635 }
636 }
637
638 #[tokio::test]
639 async fn test_producer_uri_route_id_overrides_header() {
640 let ctx = test_producer_ctx_with_route("from-uri", RouteStatus::Started);
641 let comp = ControlBusComponent::new();
643 let endpoint = comp
644 .create_endpoint("controlbus:route?routeId=from-uri&action=status")
645 .unwrap();
646 let producer = endpoint.create_producer(&ctx).unwrap();
647
648 let mut exchange = Exchange::new(Message::default());
649 exchange.input.set_header(
651 "CamelRouteId",
652 serde_json::Value::String("from-header".to_string()),
653 );
654
655 let result = producer.oneshot(exchange).await.unwrap();
656 if let Body::Text(status) = &result.input.body {
657 assert_eq!(status, "Started", "Should use URI routeId, not header");
658 }
659 }
660
661 #[tokio::test]
662 async fn test_producer_error_no_route_id() {
663 let ctx = test_producer_ctx();
664 let comp = ControlBusComponent::new();
665 let endpoint = comp
666 .create_endpoint("controlbus:route?action=status")
667 .unwrap();
668 let producer = endpoint.create_producer(&ctx).unwrap();
669
670 let exchange = Exchange::new(Message::default());
671 let result = producer.oneshot(exchange).await;
672 assert!(result.is_err());
673 let err = result.unwrap_err().to_string();
674 assert!(
675 err.contains("routeId required"),
676 "Error should mention routeId: {}",
677 err
678 );
679 }
680
681 #[tokio::test]
682 async fn test_producer_error_route_not_found() {
683 let ctx = test_producer_ctx(); let comp = ControlBusComponent::new();
685 let endpoint = comp
686 .create_endpoint("controlbus:route?routeId=nonexistent&action=status")
687 .unwrap();
688 let producer = endpoint.create_producer(&ctx).unwrap();
689
690 let exchange = Exchange::new(Message::default());
691 let result = producer.oneshot(exchange).await;
692 assert!(result.is_err());
693 let err = result.unwrap_err().to_string();
694 assert!(
695 err.contains("not found"),
696 "Error should mention not found: {}",
697 err
698 );
699 }
700
701 #[test]
702 fn test_endpoint_parses_suspend_action() {
703 let comp = ControlBusComponent::new();
704 let endpoint = comp
705 .create_endpoint("controlbus:route?routeId=foo&action=suspend")
706 .unwrap();
707 assert_eq!(
708 endpoint.uri(),
709 "controlbus:route?routeId=foo&action=suspend"
710 );
711 }
712
713 #[test]
714 fn test_endpoint_parses_resume_action() {
715 let comp = ControlBusComponent::new();
716 let endpoint = comp
717 .create_endpoint("controlbus:route?routeId=foo&action=resume")
718 .unwrap();
719 assert_eq!(endpoint.uri(), "controlbus:route?routeId=foo&action=resume");
720 }
721
722 #[test]
723 fn test_endpoint_parses_restart_action() {
724 let comp = ControlBusComponent::new();
725 let endpoint = comp
726 .create_endpoint("controlbus:route?routeId=foo&action=restart")
727 .unwrap();
728 assert_eq!(
729 endpoint.uri(),
730 "controlbus:route?routeId=foo&action=restart"
731 );
732 }
733
734 #[test]
735 fn test_endpoint_rejects_unknown_command() {
736 let comp = ControlBusComponent::new();
737 let result = comp.create_endpoint("controlbus:unknown?action=start");
738 assert!(result.is_err(), "Should error for unknown command");
739 }
740}