camel_component_controlbus/
lib.rs1use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33
34#[cfg(test)]
35use async_trait::async_trait;
36use tokio::sync::Mutex;
37use tower::Service;
38use tracing::debug;
39
40use camel_api::{Body, BoxProcessor, CamelError, Exchange, RouteAction, RouteStatus};
41use camel_component::{Component, Consumer, Endpoint, ProducerContext};
42use camel_endpoint::parse_uri;
43
44pub struct ControlBusComponent;
50
51impl ControlBusComponent {
52 pub fn new() -> Self {
54 Self
55 }
56}
57
58impl Default for ControlBusComponent {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64impl Component for ControlBusComponent {
65 fn scheme(&self) -> &str {
66 "controlbus"
67 }
68
69 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
70 let parts = parse_uri(uri)?;
71
72 if parts.scheme != "controlbus" {
73 return Err(CamelError::InvalidUri(format!(
74 "expected scheme 'controlbus', got '{}'",
75 parts.scheme
76 )));
77 }
78
79 let command = parts.path.clone();
81
82 if command != "route" {
84 return Err(CamelError::EndpointCreationFailed(format!(
85 "controlbus: unknown command '{}', only 'route' is supported",
86 command
87 )));
88 }
89
90 let route_id = parts.params.get("routeId").cloned();
92
93 let action = if let Some(action_str) = parts.params.get("action") {
95 Some(parse_action(action_str)?)
96 } else {
97 None
98 };
99
100 if command == "route" && action.is_none() {
102 return Err(CamelError::EndpointCreationFailed(
103 "controlbus: 'action' parameter is required for route command".to_string(),
104 ));
105 }
106
107 Ok(Box::new(ControlBusEndpoint {
108 uri: uri.to_string(),
109 route_id,
110 action,
111 }))
112 }
113}
114
115fn parse_action(s: &str) -> Result<RouteAction, CamelError> {
117 match s.to_lowercase().as_str() {
118 "start" => Ok(RouteAction::Start),
119 "stop" => Ok(RouteAction::Stop),
120 "suspend" => Ok(RouteAction::Suspend),
121 "resume" => Ok(RouteAction::Resume),
122 "restart" => Ok(RouteAction::Restart),
123 "status" => Ok(RouteAction::Status),
124 _ => Err(CamelError::EndpointCreationFailed(format!(
125 "controlbus: unknown action '{}'",
126 s
127 ))),
128 }
129}
130
131struct ControlBusEndpoint {
137 uri: String,
138 route_id: Option<String>,
139 action: Option<RouteAction>,
140}
141
142impl Endpoint for ControlBusEndpoint {
143 fn uri(&self) -> &str {
144 &self.uri
145 }
146
147 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
148 Err(CamelError::EndpointCreationFailed(
149 "controlbus endpoint does not support consumers".to_string(),
150 ))
151 }
152
153 fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
154 let action = self.action.clone().ok_or_else(|| {
155 CamelError::EndpointCreationFailed(
156 "controlbus: action is required to create producer".to_string(),
157 )
158 })?;
159
160 Ok(BoxProcessor::new(ControlBusProducer {
161 route_id: self.route_id.clone(),
162 action,
163 controller: ctx.route_controller().clone(),
164 }))
165 }
166}
167
168#[derive(Clone)]
174struct ControlBusProducer {
175 route_id: Option<String>,
177 action: RouteAction,
179 controller: Arc<Mutex<dyn camel_api::RouteController>>,
181}
182
183impl Service<Exchange> for ControlBusProducer {
184 type Response = Exchange;
185 type Error = CamelError;
186 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
187
188 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
189 Poll::Ready(Ok(()))
190 }
191
192 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
193 let route_id = self.route_id.clone().or_else(|| {
195 exchange
196 .input
197 .header("CamelRouteId")
198 .and_then(|v| v.as_str().map(|s| s.to_string()))
199 });
200
201 let route_id = match route_id {
203 Some(id) => id,
204 None => {
205 return Box::pin(async {
206 Err(CamelError::ProcessorError(
207 "controlbus: routeId required (set via URI param or CamelRouteId header)"
208 .into(),
209 ))
210 });
211 }
212 };
213
214 let action = self.action.clone();
215 let controller = self.controller.clone();
216
217 Box::pin(async move {
218 let mut ctrl = controller.lock().await;
219
220 debug!(
221 route_id = %route_id,
222 action = ?action,
223 "ControlBus executing action"
224 );
225
226 match action {
227 RouteAction::Start => {
228 ctrl.start_route(&route_id).await?;
229 }
230 RouteAction::Stop => {
231 ctrl.stop_route(&route_id).await?;
232 }
233 RouteAction::Suspend => {
234 ctrl.suspend_route(&route_id).await?;
235 }
236 RouteAction::Resume => {
237 ctrl.resume_route(&route_id).await?;
238 }
239 RouteAction::Restart => {
240 ctrl.restart_route(&route_id).await?;
241 }
242 RouteAction::Status => {
243 let status = ctrl.route_status(&route_id).ok_or_else(|| {
244 CamelError::ProcessorError(format!(
245 "controlbus: route '{}' not found",
246 route_id
247 ))
248 })?;
249 exchange.input.body = Body::Text(format_status(&status));
250 return Ok(exchange);
251 }
252 }
253
254 exchange.input.body = Body::Empty;
256 Ok(exchange)
257 })
258 }
259}
260
261fn format_status(status: &RouteStatus) -> String {
263 match status {
264 RouteStatus::Stopped => "Stopped".to_string(),
265 RouteStatus::Starting => "Starting".to_string(),
266 RouteStatus::Started => "Started".to_string(),
267 RouteStatus::Stopping => "Stopping".to_string(),
268 RouteStatus::Suspended => "Suspended".to_string(),
269 RouteStatus::Failed(msg) => format!("Failed: {}", msg),
270 }
271}
272
273#[cfg(test)]
278mod tests {
279 use super::*;
280 use camel_api::Message;
281 use tower::ServiceExt;
282
283 struct MockRouteController {
285 routes: std::collections::HashMap<String, RouteStatus>,
286 }
287
288 impl MockRouteController {
289 fn new() -> Self {
290 Self {
291 routes: std::collections::HashMap::new(),
292 }
293 }
294
295 fn with_route(mut self, id: &str, status: RouteStatus) -> Self {
296 self.routes.insert(id.to_string(), status);
297 self
298 }
299 }
300
301 #[async_trait]
302 impl camel_api::RouteController for MockRouteController {
303 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
304 if self.routes.contains_key(route_id) {
305 self.routes
306 .insert(route_id.to_string(), RouteStatus::Started);
307 Ok(())
308 } else {
309 Err(CamelError::ProcessorError(format!(
310 "route '{}' not found",
311 route_id
312 )))
313 }
314 }
315
316 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
317 if self.routes.contains_key(route_id) {
318 self.routes
319 .insert(route_id.to_string(), RouteStatus::Stopped);
320 Ok(())
321 } else {
322 Err(CamelError::ProcessorError(format!(
323 "route '{}' not found",
324 route_id
325 )))
326 }
327 }
328
329 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
330 if self.routes.contains_key(route_id) {
331 self.routes
332 .insert(route_id.to_string(), RouteStatus::Started);
333 Ok(())
334 } else {
335 Err(CamelError::ProcessorError(format!(
336 "route '{}' not found",
337 route_id
338 )))
339 }
340 }
341
342 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
343 if self.routes.contains_key(route_id) {
344 self.routes
345 .insert(route_id.to_string(), RouteStatus::Suspended);
346 Ok(())
347 } else {
348 Err(CamelError::ProcessorError(format!(
349 "route '{}' not found",
350 route_id
351 )))
352 }
353 }
354
355 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
356 if self.routes.contains_key(route_id) {
357 self.routes
358 .insert(route_id.to_string(), RouteStatus::Started);
359 Ok(())
360 } else {
361 Err(CamelError::ProcessorError(format!(
362 "route '{}' not found",
363 route_id
364 )))
365 }
366 }
367
368 fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
369 self.routes.get(route_id).cloned()
370 }
371
372 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
373 for status in self.routes.values_mut() {
374 *status = RouteStatus::Started;
375 }
376 Ok(())
377 }
378
379 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
380 for status in self.routes.values_mut() {
381 *status = RouteStatus::Stopped;
382 }
383 Ok(())
384 }
385 }
386
387 fn test_producer_ctx() -> ProducerContext {
388 ProducerContext::new(Arc::new(Mutex::new(MockRouteController::new())))
389 }
390
391 fn test_producer_ctx_with_route(id: &str, status: RouteStatus) -> ProducerContext {
392 ProducerContext::new(Arc::new(Mutex::new(
393 MockRouteController::new().with_route(id, status),
394 )))
395 }
396
397 #[test]
398 fn test_endpoint_requires_action_for_route_command() {
399 let comp = ControlBusComponent::new();
400 let result = comp.create_endpoint("controlbus:route?routeId=foo");
401 assert!(result.is_err(), "Should error when action is missing");
402 }
403
404 #[test]
405 fn test_endpoint_rejects_unknown_action() {
406 let comp = ControlBusComponent::new();
407 let result = comp.create_endpoint("controlbus:route?routeId=foo&action=banana");
408 assert!(result.is_err(), "Should error for unknown action");
409 }
410
411 #[test]
412 fn test_endpoint_parses_valid_uri() {
413 let comp = ControlBusComponent::new();
414 let endpoint = comp
415 .create_endpoint("controlbus:route?routeId=foo&action=start")
416 .unwrap();
417 assert_eq!(endpoint.uri(), "controlbus:route?routeId=foo&action=start");
418 }
419
420 #[test]
421 fn test_endpoint_returns_no_consumer() {
422 let comp = ControlBusComponent::new();
423 let endpoint = comp
424 .create_endpoint("controlbus:route?routeId=foo&action=stop")
425 .unwrap();
426 assert!(endpoint.create_consumer().is_err());
427 }
428
429 #[test]
430 fn test_endpoint_creates_producer() {
431 let ctx = test_producer_ctx();
432 let comp = ControlBusComponent::new();
433 let endpoint = comp
434 .create_endpoint("controlbus:route?routeId=foo&action=start")
435 .unwrap();
436 assert!(endpoint.create_producer(&ctx).is_ok());
437 }
438
439 #[test]
440 fn test_component_scheme() {
441 let comp = ControlBusComponent::new();
442 assert_eq!(comp.scheme(), "controlbus");
443 }
444
445 #[tokio::test]
446 async fn test_producer_start_route() {
447 let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Stopped);
448 let comp = ControlBusComponent::new();
449 let endpoint = comp
450 .create_endpoint("controlbus:route?routeId=my-route&action=start")
451 .unwrap();
452 let producer = endpoint.create_producer(&ctx).unwrap();
453
454 let exchange = Exchange::new(Message::default());
455 let result = producer.oneshot(exchange).await.unwrap();
456 assert!(matches!(result.input.body, Body::Empty));
457 }
458
459 #[tokio::test]
460 async fn test_producer_stop_route() {
461 let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Started);
462 let comp = ControlBusComponent::new();
463 let endpoint = comp
464 .create_endpoint("controlbus:route?routeId=my-route&action=stop")
465 .unwrap();
466 let producer = endpoint.create_producer(&ctx).unwrap();
467
468 let exchange = Exchange::new(Message::default());
469 let result = producer.oneshot(exchange).await.unwrap();
470 assert!(matches!(result.input.body, Body::Empty));
471 }
472
473 #[tokio::test]
474 async fn test_producer_status_route() {
475 let ctx = test_producer_ctx_with_route("my-route", RouteStatus::Started);
476 let comp = ControlBusComponent::new();
477 let endpoint = comp
478 .create_endpoint("controlbus:route?routeId=my-route&action=status")
479 .unwrap();
480 let producer = endpoint.create_producer(&ctx).unwrap();
481
482 let exchange = Exchange::new(Message::default());
483 let result = producer.oneshot(exchange).await.unwrap();
484 assert!(matches!(result.input.body, Body::Text(_)));
485 if let Body::Text(status) = &result.input.body {
486 assert_eq!(status, "Started");
487 }
488 }
489
490 #[tokio::test]
491 async fn test_producer_status_failed_route() {
492 let ctx =
493 test_producer_ctx_with_route("my-route", RouteStatus::Failed("error msg".to_string()));
494 let comp = ControlBusComponent::new();
495 let endpoint = comp
496 .create_endpoint("controlbus:route?routeId=my-route&action=status")
497 .unwrap();
498 let producer = endpoint.create_producer(&ctx).unwrap();
499
500 let exchange = Exchange::new(Message::default());
501 let result = producer.oneshot(exchange).await.unwrap();
502 assert!(matches!(result.input.body, Body::Text(_)));
503 if let Body::Text(status) = &result.input.body {
504 assert_eq!(status, "Failed: error msg");
505 }
506 }
507
508 #[tokio::test]
509 async fn test_producer_uses_header_route_id() {
510 let ctx = test_producer_ctx_with_route("from-header", RouteStatus::Started);
511 let comp = ControlBusComponent::new();
512 let endpoint = comp
514 .create_endpoint("controlbus:route?action=status")
515 .unwrap();
516 let producer = endpoint.create_producer(&ctx).unwrap();
517
518 let mut exchange = Exchange::new(Message::default());
519 exchange.input.set_header(
520 "CamelRouteId",
521 serde_json::Value::String("from-header".to_string()),
522 );
523
524 let result = producer.oneshot(exchange).await.unwrap();
525 assert!(matches!(result.input.body, Body::Text(_)));
526 if let Body::Text(status) = &result.input.body {
527 assert_eq!(status, "Started");
528 }
529 }
530
531 #[tokio::test]
532 async fn test_producer_uri_route_id_overrides_header() {
533 let ctx = test_producer_ctx_with_route("from-uri", RouteStatus::Started);
534 let comp = ControlBusComponent::new();
536 let endpoint = comp
537 .create_endpoint("controlbus:route?routeId=from-uri&action=status")
538 .unwrap();
539 let producer = endpoint.create_producer(&ctx).unwrap();
540
541 let mut exchange = Exchange::new(Message::default());
542 exchange.input.set_header(
544 "CamelRouteId",
545 serde_json::Value::String("from-header".to_string()),
546 );
547
548 let result = producer.oneshot(exchange).await.unwrap();
549 if let Body::Text(status) = &result.input.body {
550 assert_eq!(status, "Started", "Should use URI routeId, not header");
551 }
552 }
553
554 #[tokio::test]
555 async fn test_producer_error_no_route_id() {
556 let ctx = test_producer_ctx();
557 let comp = ControlBusComponent::new();
558 let endpoint = comp
559 .create_endpoint("controlbus:route?action=status")
560 .unwrap();
561 let producer = endpoint.create_producer(&ctx).unwrap();
562
563 let exchange = Exchange::new(Message::default());
564 let result = producer.oneshot(exchange).await;
565 assert!(result.is_err());
566 let err = result.unwrap_err().to_string();
567 assert!(
568 err.contains("routeId required"),
569 "Error should mention routeId: {}",
570 err
571 );
572 }
573
574 #[tokio::test]
575 async fn test_producer_error_route_not_found() {
576 let ctx = test_producer_ctx(); let comp = ControlBusComponent::new();
578 let endpoint = comp
579 .create_endpoint("controlbus:route?routeId=nonexistent&action=status")
580 .unwrap();
581 let producer = endpoint.create_producer(&ctx).unwrap();
582
583 let exchange = Exchange::new(Message::default());
584 let result = producer.oneshot(exchange).await;
585 assert!(result.is_err());
586 let err = result.unwrap_err().to_string();
587 assert!(
588 err.contains("not found"),
589 "Error should mention not found: {}",
590 err
591 );
592 }
593
594 #[test]
595 fn test_endpoint_parses_suspend_action() {
596 let comp = ControlBusComponent::new();
597 let endpoint = comp
598 .create_endpoint("controlbus:route?routeId=foo&action=suspend")
599 .unwrap();
600 assert_eq!(
601 endpoint.uri(),
602 "controlbus:route?routeId=foo&action=suspend"
603 );
604 }
605
606 #[test]
607 fn test_endpoint_parses_resume_action() {
608 let comp = ControlBusComponent::new();
609 let endpoint = comp
610 .create_endpoint("controlbus:route?routeId=foo&action=resume")
611 .unwrap();
612 assert_eq!(endpoint.uri(), "controlbus:route?routeId=foo&action=resume");
613 }
614
615 #[test]
616 fn test_endpoint_parses_restart_action() {
617 let comp = ControlBusComponent::new();
618 let endpoint = comp
619 .create_endpoint("controlbus:route?routeId=foo&action=restart")
620 .unwrap();
621 assert_eq!(
622 endpoint.uri(),
623 "controlbus:route?routeId=foo&action=restart"
624 );
625 }
626
627 #[test]
628 fn test_endpoint_rejects_unknown_command() {
629 let comp = ControlBusComponent::new();
630 let result = comp.create_endpoint("controlbus:unknown?action=start");
631 assert!(result.is_err(), "Should error for unknown command");
632 }
633}