1use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9use tokio::sync::{Mutex, mpsc};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, Service, ServiceExt};
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::metrics::MetricsCollector;
17use camel_api::{
18 BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
19 RouteController, RuntimeCommand, RuntimeHandle, Value, body::Body,
20};
21use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
22use camel_endpoint::parse_uri;
23use camel_language_api::{Expression, Language, LanguageError, Predicate};
24use camel_processor::circuit_breaker::CircuitBreakerLayer;
25use camel_processor::error_handler::ErrorHandlerLayer;
26use camel_processor::script_mutator::ScriptMutator;
27use camel_processor::{ChoiceService, WhenClause};
28
29use crate::lifecycle::adapters::route_compiler::{compose_pipeline, compose_traced_pipeline};
30use crate::lifecycle::application::route_definition::{
31 BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
32};
33use crate::shared::components::domain::Registry;
34use crate::shared::observability::domain::{DetailLevel, TracerConfig};
35use arc_swap::ArcSwap;
36use camel_bean::BeanRegistry;
37
38#[derive(Debug, Clone)]
43pub struct CrashNotification {
44 pub route_id: String,
46 pub error: String,
48}
49
50pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
65unsafe impl Sync for SyncBoxProcessor {}
66
67type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
68pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
69
70#[async_trait::async_trait]
76pub trait RouteControllerInternal: RouteController + Send {
77 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
79
80 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
82
83 fn route_from_uri(&self, route_id: &str) -> Option<String>;
85
86 fn set_error_handler(&mut self, config: ErrorHandlerConfig);
88
89 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
91
92 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>);
94
95 fn route_count(&self) -> usize;
97
98 fn route_ids(&self) -> Vec<String>;
100
101 fn auto_startup_route_ids(&self) -> Vec<String>;
103
104 fn shutdown_route_ids(&self) -> Vec<String>;
106
107 fn set_tracer_config(&mut self, config: &TracerConfig);
109
110 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
113
114 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
116
117 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
119
120 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
122}
123
124struct ManagedRoute {
126 definition: RouteDefinitionInfo,
128 from_uri: String,
130 pipeline: SharedPipeline,
132 concurrency: Option<ConcurrencyModel>,
134 consumer_handle: Option<JoinHandle<()>>,
136 pipeline_handle: Option<JoinHandle<()>>,
138 consumer_cancel_token: CancellationToken,
141 pipeline_cancel_token: CancellationToken,
144 channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
147}
148
149fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
150 handle.as_ref().is_some_and(|h| !h.is_finished())
151}
152
153fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
154 match (
155 handle_is_running(&managed.consumer_handle),
156 handle_is_running(&managed.pipeline_handle),
157 ) {
158 (true, true) => "Started",
159 (false, true) => "Suspended",
160 (true, false) => "Stopping",
161 (false, false) => "Stopped",
162 }
163}
164
165async fn ready_with_backoff(
172 pipeline: &mut BoxProcessor,
173 cancel: &CancellationToken,
174) -> Result<(), CamelError> {
175 loop {
176 match pipeline.ready().await {
177 Ok(_) => return Ok(()),
178 Err(CamelError::CircuitOpen(ref msg)) => {
179 warn!("Circuit open, backing off: {msg}");
180 tokio::select! {
181 _ = tokio::time::sleep(Duration::from_secs(1)) => {
182 continue;
183 }
184 _ = cancel.cancelled() => {
185 return Err(CamelError::CircuitOpen(msg.clone()));
187 }
188 }
189 }
190 Err(e) => {
191 error!("Pipeline not ready: {e}");
192 return Err(e);
193 }
194 }
195 }
196}
197
198fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
199 let stamp = std::time::SystemTime::now()
200 .duration_since(std::time::UNIX_EPOCH)
201 .unwrap_or_default()
202 .as_nanos();
203 RuntimeCommand::FailRoute {
204 route_id: route_id.to_string(),
205 error: error.to_string(),
206 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
207 causation_id: None,
208 }
209}
210
211async fn publish_runtime_failure(
212 runtime: Option<Weak<dyn RuntimeHandle>>,
213 route_id: &str,
214 error: &str,
215) {
216 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
217 return;
218 };
219 let command = runtime_failure_command(route_id, error);
220 if let Err(runtime_error) = runtime.execute(command).await {
221 warn!(
222 route_id = %route_id,
223 error = %runtime_error,
224 "failed to synchronize route crash with runtime projection"
225 );
226 }
227}
228
229pub struct DefaultRouteController {
237 routes: HashMap<String, ManagedRoute>,
239 registry: Arc<std::sync::Mutex<Registry>>,
241 languages: SharedLanguageRegistry,
243 beans: Arc<std::sync::Mutex<BeanRegistry>>,
245 self_ref: Option<Arc<Mutex<dyn RouteController>>>,
248 runtime: Option<Weak<dyn RuntimeHandle>>,
250 global_error_handler: Option<ErrorHandlerConfig>,
252 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
254 tracing_enabled: bool,
256 tracer_detail_level: DetailLevel,
258 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
260}
261
262impl DefaultRouteController {
263 pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
265 Self::with_beans(
266 registry,
267 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
268 )
269 }
270
271 pub fn with_beans(
273 registry: Arc<std::sync::Mutex<Registry>>,
274 beans: Arc<std::sync::Mutex<BeanRegistry>>,
275 ) -> Self {
276 Self {
277 routes: HashMap::new(),
278 registry,
279 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
280 beans,
281 self_ref: None,
282 runtime: None,
283 global_error_handler: None,
284 crash_notifier: None,
285 tracing_enabled: false,
286 tracer_detail_level: DetailLevel::Minimal,
287 tracer_metrics: None,
288 }
289 }
290
291 pub fn with_languages(
293 registry: Arc<std::sync::Mutex<Registry>>,
294 languages: SharedLanguageRegistry,
295 ) -> Self {
296 Self {
297 routes: HashMap::new(),
298 registry,
299 languages,
300 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
301 self_ref: None,
302 runtime: None,
303 global_error_handler: None,
304 crash_notifier: None,
305 tracing_enabled: false,
306 tracer_detail_level: DetailLevel::Minimal,
307 tracer_metrics: None,
308 }
309 }
310
311 pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
315 self.self_ref = Some(self_ref);
316 }
317
318 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
320 self.runtime = Some(Arc::downgrade(&runtime));
321 }
322
323 pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
328 self.self_ref.clone()
329 }
330
331 pub fn runtime_handle_for_supervision(&self) -> Option<Arc<dyn RuntimeHandle>> {
333 self.runtime.as_ref().and_then(Weak::upgrade)
334 }
335
336 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
341 self.crash_notifier = Some(tx);
342 }
343
344 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
346 self.global_error_handler = Some(config);
347 }
348
349 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
351 self.tracing_enabled = config.enabled;
352 self.tracer_detail_level = config.detail_level.clone();
353 self.tracer_metrics = config.metrics_collector.clone();
354 }
355
356 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
357 let mut producer_ctx = ProducerContext::new();
358 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
359 producer_ctx = producer_ctx.with_runtime(runtime);
360 }
361 Ok(producer_ctx)
362 }
363
364 fn resolve_error_handler(
366 &self,
367 config: ErrorHandlerConfig,
368 producer_ctx: &ProducerContext,
369 registry: &Registry,
370 ) -> Result<ErrorHandlerLayer, CamelError> {
371 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
373 let parsed = parse_uri(uri)?;
374 let component = registry.get_or_err(&parsed.scheme)?;
375 let endpoint = component.create_endpoint(uri)?;
376 Some(endpoint.create_producer(producer_ctx)?)
377 } else {
378 None
379 };
380
381 let mut resolved_policies = Vec::new();
383 for policy in config.policies {
384 let handler_producer = if let Some(ref uri) = policy.handled_by {
385 let parsed = parse_uri(uri)?;
386 let component = registry.get_or_err(&parsed.scheme)?;
387 let endpoint = component.create_endpoint(uri)?;
388 Some(endpoint.create_producer(producer_ctx)?)
389 } else {
390 None
391 };
392 resolved_policies.push((policy, handler_producer));
393 }
394
395 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
396 }
397
398 fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
399 let guard = self
400 .languages
401 .lock()
402 .expect("mutex poisoned: another thread panicked while holding this lock");
403 guard.get(language).cloned().ok_or_else(|| {
404 CamelError::RouteError(format!(
405 "language `{language}` is not registered in CamelContext"
406 ))
407 })
408 }
409
410 fn compile_language_expression(
411 &self,
412 expression: &LanguageExpressionDef,
413 ) -> Result<Arc<dyn Expression>, CamelError> {
414 let language = self.resolve_language(&expression.language)?;
415 let compiled = language
416 .create_expression(&expression.source)
417 .map_err(|e| {
418 CamelError::RouteError(format!(
419 "failed to compile {} expression `{}`: {e}",
420 expression.language, expression.source
421 ))
422 })?;
423 Ok(Arc::from(compiled))
424 }
425
426 fn compile_language_predicate(
427 &self,
428 expression: &LanguageExpressionDef,
429 ) -> Result<Arc<dyn Predicate>, CamelError> {
430 let language = self.resolve_language(&expression.language)?;
431 let compiled = language.create_predicate(&expression.source).map_err(|e| {
432 CamelError::RouteError(format!(
433 "failed to compile {} predicate `{}`: {e}",
434 expression.language, expression.source
435 ))
436 })?;
437 Ok(Arc::from(compiled))
438 }
439
440 fn compile_filter_predicate(
441 &self,
442 expression: &LanguageExpressionDef,
443 ) -> Result<FilterPredicate, CamelError> {
444 let predicate = self.compile_language_predicate(expression)?;
445 Ok(Arc::new(move |exchange: &Exchange| {
446 predicate.matches(exchange).unwrap_or(false)
447 }))
448 }
449
450 fn value_to_body(value: Value) -> Body {
451 match value {
452 Value::Null => Body::Empty,
453 Value::String(text) => Body::Text(text),
454 other => Body::Json(other),
455 }
456 }
457
458 pub(crate) fn resolve_steps(
460 &self,
461 steps: Vec<BuilderStep>,
462 producer_ctx: &ProducerContext,
463 registry: Arc<std::sync::Mutex<Registry>>,
464 ) -> Result<Vec<BoxProcessor>, CamelError> {
465 let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
466 let parsed = parse_uri(uri)?;
467 let registry_guard = registry
468 .lock()
469 .expect("mutex poisoned: another thread panicked while holding this lock");
470 let component = registry_guard.get_or_err(&parsed.scheme)?;
471 let endpoint = component.create_endpoint(uri)?;
472 endpoint.create_producer(producer_ctx)
473 };
474
475 let mut processors: Vec<BoxProcessor> = Vec::new();
476 for step in steps {
477 match step {
478 BuilderStep::Processor(svc) => {
479 processors.push(svc);
480 }
481 BuilderStep::To(uri) => {
482 let producer = resolve_producer(&uri)?;
483 processors.push(producer);
484 }
485 BuilderStep::Stop => {
486 processors.push(BoxProcessor::new(camel_processor::StopService));
487 }
488 BuilderStep::Log { level, message } => {
489 let svc = camel_processor::LogProcessor::new(level, message);
490 processors.push(BoxProcessor::new(svc));
491 }
492 BuilderStep::DeclarativeSetHeader { key, value } => match value {
493 ValueSourceDef::Literal(value) => {
494 let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
495 processors.push(BoxProcessor::new(svc));
496 }
497 ValueSourceDef::Expression(expression) => {
498 let expression = self.compile_language_expression(&expression)?;
499 let svc = camel_processor::DynamicSetHeader::new(
500 IdentityProcessor,
501 key,
502 move |exchange: &Exchange| {
503 expression.evaluate(exchange).unwrap_or(Value::Null)
504 },
505 );
506 processors.push(BoxProcessor::new(svc));
507 }
508 },
509 BuilderStep::DeclarativeSetBody { value } => match value {
510 ValueSourceDef::Literal(value) => {
511 let body = Self::value_to_body(value);
512 let svc = camel_processor::SetBody::new(
513 IdentityProcessor,
514 move |_exchange: &Exchange| body.clone(),
515 );
516 processors.push(BoxProcessor::new(svc));
517 }
518 ValueSourceDef::Expression(expression) => {
519 let expression = self.compile_language_expression(&expression)?;
520 let svc = camel_processor::SetBody::new(
521 IdentityProcessor,
522 move |exchange: &Exchange| {
523 let value = expression.evaluate(exchange).unwrap_or(Value::Null);
524 Self::value_to_body(value)
525 },
526 );
527 processors.push(BoxProcessor::new(svc));
528 }
529 },
530 BuilderStep::DeclarativeFilter { predicate, steps } => {
531 let predicate = self.compile_filter_predicate(&predicate)?;
532 let sub_processors =
533 self.resolve_steps(steps, producer_ctx, registry.clone())?;
534 let sub_pipeline = compose_pipeline(sub_processors);
535 let svc =
536 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
537 processors.push(BoxProcessor::new(svc));
538 }
539 BuilderStep::DeclarativeChoice { whens, otherwise } => {
540 let mut when_clauses = Vec::new();
541 for when_step in whens {
542 let predicate = self.compile_filter_predicate(&when_step.predicate)?;
543 let sub_processors =
544 self.resolve_steps(when_step.steps, producer_ctx, registry.clone())?;
545 let pipeline = compose_pipeline(sub_processors);
546 when_clauses.push(WhenClause {
547 predicate,
548 pipeline,
549 });
550 }
551 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
552 let sub_processors =
553 self.resolve_steps(otherwise_steps, producer_ctx, registry.clone())?;
554 Some(compose_pipeline(sub_processors))
555 } else {
556 None
557 };
558 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
559 processors.push(BoxProcessor::new(svc));
560 }
561 BuilderStep::DeclarativeScript { expression } => {
562 let expression = self.compile_language_expression(&expression)?;
563 let svc = camel_processor::SetBody::new(
564 IdentityProcessor,
565 move |exchange: &Exchange| {
566 let value = expression.evaluate(exchange).unwrap_or(Value::Null);
567 Self::value_to_body(value)
568 },
569 );
570 processors.push(BoxProcessor::new(svc));
571 }
572 BuilderStep::Split { config, steps } => {
573 let sub_processors =
574 self.resolve_steps(steps, producer_ctx, registry.clone())?;
575 let sub_pipeline = compose_pipeline(sub_processors);
576 let splitter =
577 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
578 processors.push(BoxProcessor::new(splitter));
579 }
580 BuilderStep::DeclarativeSplit {
581 expression,
582 aggregation,
583 parallel,
584 parallel_limit,
585 stop_on_exception,
586 steps,
587 } => {
588 let lang_expr = self.compile_language_expression(&expression)?;
589 let split_fn = move |exchange: &Exchange| {
590 let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
591 match value {
592 Value::String(s) => s
593 .lines()
594 .filter(|line| !line.is_empty())
595 .map(|line| {
596 let mut fragment = exchange.clone();
597 fragment.input.body = Body::from(line.to_string());
598 fragment
599 })
600 .collect(),
601 Value::Array(arr) => arr
602 .into_iter()
603 .map(|v| {
604 let mut fragment = exchange.clone();
605 fragment.input.body = Body::from(v);
606 fragment
607 })
608 .collect(),
609 _ => vec![exchange.clone()],
610 }
611 };
612
613 let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
614 .aggregation(aggregation)
615 .parallel(parallel)
616 .stop_on_exception(stop_on_exception);
617 if let Some(limit) = parallel_limit {
618 config = config.parallel_limit(limit);
619 }
620
621 let sub_processors =
622 self.resolve_steps(steps, producer_ctx, registry.clone())?;
623 let sub_pipeline = compose_pipeline(sub_processors);
624 let splitter =
625 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
626 processors.push(BoxProcessor::new(splitter));
627 }
628 BuilderStep::Aggregate { config } => {
629 let svc = camel_processor::AggregatorService::new(config);
630 processors.push(BoxProcessor::new(svc));
631 }
632 BuilderStep::Filter { predicate, steps } => {
633 let sub_processors =
634 self.resolve_steps(steps, producer_ctx, registry.clone())?;
635 let sub_pipeline = compose_pipeline(sub_processors);
636 let svc =
637 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
638 processors.push(BoxProcessor::new(svc));
639 }
640 BuilderStep::Choice { whens, otherwise } => {
641 let mut when_clauses = Vec::new();
643 for when_step in whens {
644 let sub_processors =
645 self.resolve_steps(when_step.steps, producer_ctx, registry.clone())?;
646 let pipeline = compose_pipeline(sub_processors);
647 when_clauses.push(WhenClause {
648 predicate: when_step.predicate,
649 pipeline,
650 });
651 }
652 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
654 let sub_processors =
655 self.resolve_steps(otherwise_steps, producer_ctx, registry.clone())?;
656 Some(compose_pipeline(sub_processors))
657 } else {
658 None
659 };
660 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
661 processors.push(BoxProcessor::new(svc));
662 }
663 BuilderStep::WireTap { uri } => {
664 let producer = resolve_producer(&uri)?;
665 let svc = camel_processor::WireTapService::new(producer);
666 processors.push(BoxProcessor::new(svc));
667 }
668 BuilderStep::Multicast { config, steps } => {
669 let mut endpoints = Vec::new();
671 for step in steps {
672 let sub_processors =
673 self.resolve_steps(vec![step], producer_ctx, registry.clone())?;
674 let endpoint = compose_pipeline(sub_processors);
675 endpoints.push(endpoint);
676 }
677 let svc = camel_processor::MulticastService::new(endpoints, config);
678 processors.push(BoxProcessor::new(svc));
679 }
680 BuilderStep::DeclarativeLog { level, message } => {
681 let ValueSourceDef::Expression(expression) = message else {
682 unreachable!(
685 "DeclarativeLog with Literal should have been compiled to a Processor"
686 );
687 };
688 let expression = self.compile_language_expression(&expression)?;
689 let svc =
690 camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
691 expression
692 .evaluate(exchange)
693 .unwrap_or_else(|e| {
694 warn!(error = %e, "log expression evaluation failed");
695 Value::Null
696 })
697 .to_string()
698 });
699 processors.push(BoxProcessor::new(svc));
700 }
701 BuilderStep::Bean { name, method } => {
702 let beans = self.beans.lock().expect(
704 "beans mutex poisoned: another thread panicked while holding this lock",
705 );
706
707 let bean = beans.get(&name).ok_or_else(|| {
709 CamelError::ProcessorError(format!("Bean not found: {}", name))
710 })?;
711
712 let bean_clone = Arc::clone(&bean);
714 let method = method.clone();
715
716 let processor = tower::service_fn(move |mut exchange: Exchange| {
718 let bean = Arc::clone(&bean_clone);
719 let method = method.clone();
720
721 async move {
722 bean.call(&method, &mut exchange).await?;
723 Ok(exchange)
724 }
725 });
726
727 processors.push(BoxProcessor::new(processor));
728 }
729 BuilderStep::Script { language, script } => {
730 let lang = self.resolve_language(&language)?;
731 match lang.create_mutating_expression(&script) {
732 Ok(mut_expr) => {
733 processors.push(BoxProcessor::new(ScriptMutator::new(mut_expr)));
734 }
735 Err(LanguageError::NotSupported {
736 feature,
737 language: ref lang_name,
738 }) => {
739 return Err(CamelError::RouteError(format!(
740 "Language '{}' does not support {} (required for .script() step)",
741 lang_name, feature
742 )));
743 }
744 Err(e) => {
745 return Err(CamelError::RouteError(format!(
746 "Failed to create mutating expression for language '{}': {}",
747 language, e
748 )));
749 }
750 }
751 }
752 BuilderStep::Throttle { config, steps } => {
753 let sub_processors =
754 self.resolve_steps(steps, producer_ctx, registry.clone())?;
755 let sub_pipeline = compose_pipeline(sub_processors);
756 let svc =
757 camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
758 processors.push(BoxProcessor::new(svc));
759 }
760 BuilderStep::LoadBalance { config, steps } => {
761 let mut endpoints = Vec::new();
763 for step in steps {
764 let sub_processors =
765 self.resolve_steps(vec![step], producer_ctx, registry.clone())?;
766 let endpoint = compose_pipeline(sub_processors);
767 endpoints.push(endpoint);
768 }
769 let svc =
770 camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
771 processors.push(BoxProcessor::new(svc));
772 }
773 BuilderStep::DynamicRouter { config } => {
774 use camel_processor::dynamic_router::EndpointResolver;
775
776 let producer_ctx_clone = producer_ctx.clone();
777 let registry_clone = registry.clone();
778 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
779 let parsed = match parse_uri(uri) {
780 Ok(p) => p,
781 Err(_) => return None,
782 };
783 let registry_guard = match registry_clone.lock() {
784 Ok(g) => g,
785 Err(_) => return None, };
787 let component = match registry_guard.get_or_err(&parsed.scheme) {
788 Ok(c) => c,
789 Err(_) => return None,
790 };
791 let endpoint = match component.create_endpoint(uri) {
792 Ok(e) => e,
793 Err(_) => return None,
794 };
795 let producer = match endpoint.create_producer(&producer_ctx_clone) {
796 Ok(p) => p,
797 Err(_) => return None,
798 };
799 Some(BoxProcessor::new(producer))
800 });
801 let svc = camel_processor::dynamic_router::DynamicRouterService::new(
802 config, resolver,
803 );
804 processors.push(BoxProcessor::new(svc));
805 }
806 }
807 }
808 Ok(processors)
809 }
810
811 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
821 let route_id = definition.route_id().to_string();
822
823 if self.routes.contains_key(&route_id) {
824 return Err(CamelError::RouteError(format!(
825 "Route '{}' already exists",
826 route_id
827 )));
828 }
829
830 info!(route_id = %route_id, "Adding route to controller");
831
832 let definition_info = definition.to_info();
834 let from_uri = definition.from_uri.to_string();
835 let concurrency = definition.concurrency;
836
837 let producer_ctx = self.build_producer_context()?;
839
840 let processors =
842 self.resolve_steps(definition.steps, &producer_ctx, self.registry.clone())?;
843 let route_id_for_tracing = route_id.clone();
844 let mut pipeline = compose_traced_pipeline(
845 processors,
846 &route_id_for_tracing,
847 self.tracing_enabled,
848 self.tracer_detail_level.clone(),
849 self.tracer_metrics.clone(),
850 );
851
852 if let Some(cb_config) = definition.circuit_breaker {
854 let cb_layer = CircuitBreakerLayer::new(cb_config);
855 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
856 }
857
858 let eh_config = definition
860 .error_handler
861 .or_else(|| self.global_error_handler.clone());
862
863 if let Some(config) = eh_config {
864 let registry = self
866 .registry
867 .lock()
868 .expect("mutex poisoned: another thread panicked while holding this lock");
869 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
870 pipeline = BoxProcessor::new(layer.layer(pipeline));
871 }
872
873 self.routes.insert(
874 route_id.clone(),
875 ManagedRoute {
876 definition: definition_info,
877 from_uri,
878 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
879 concurrency,
880 consumer_handle: None,
881 pipeline_handle: None,
882 consumer_cancel_token: CancellationToken::new(),
883 pipeline_cancel_token: CancellationToken::new(),
884 channel_sender: None,
885 },
886 );
887
888 Ok(())
889 }
890
891 pub fn compile_route_definition(
896 &self,
897 def: RouteDefinition,
898 ) -> Result<BoxProcessor, CamelError> {
899 let route_id = def.route_id().to_string();
900
901 let producer_ctx = self.build_producer_context()?;
902
903 let processors = self.resolve_steps(def.steps, &producer_ctx, self.registry.clone())?;
904 let mut pipeline = compose_traced_pipeline(
905 processors,
906 &route_id,
907 self.tracing_enabled,
908 self.tracer_detail_level.clone(),
909 self.tracer_metrics.clone(),
910 );
911
912 if let Some(cb_config) = def.circuit_breaker {
913 let cb_layer = CircuitBreakerLayer::new(cb_config);
914 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
915 }
916
917 let eh_config = def
918 .error_handler
919 .or_else(|| self.global_error_handler.clone());
920 if let Some(config) = eh_config {
921 let registry = self
923 .registry
924 .lock()
925 .expect("mutex poisoned: registry lock in compile_route_definition");
926 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
927 pipeline = BoxProcessor::new(layer.layer(pipeline));
928 }
929
930 Ok(pipeline)
931 }
932
933 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
939 let managed = self.routes.get(route_id).ok_or_else(|| {
940 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
941 })?;
942 if handle_is_running(&managed.consumer_handle)
943 || handle_is_running(&managed.pipeline_handle)
944 {
945 return Err(CamelError::RouteError(format!(
946 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
947 route_id,
948 inferred_lifecycle_label(managed)
949 )));
950 }
951 self.routes.remove(route_id);
952 info!(route_id = %route_id, "Route removed from controller");
953 Ok(())
954 }
955
956 pub fn route_count(&self) -> usize {
958 self.routes.len()
959 }
960
961 pub fn route_ids(&self) -> Vec<String> {
963 self.routes.keys().cloned().collect()
964 }
965
966 pub fn auto_startup_route_ids(&self) -> Vec<String> {
968 let mut pairs: Vec<(String, i32)> = self
969 .routes
970 .iter()
971 .filter(|(_, managed)| managed.definition.auto_startup())
972 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
973 .collect();
974 pairs.sort_by_key(|(_, order)| *order);
975 pairs.into_iter().map(|(id, _)| id).collect()
976 }
977
978 pub fn shutdown_route_ids(&self) -> Vec<String> {
980 let mut pairs: Vec<(String, i32)> = self
981 .routes
982 .iter()
983 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
984 .collect();
985 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
986 pairs.into_iter().map(|(id, _)| id).collect()
987 }
988
989 pub fn swap_pipeline(
994 &self,
995 route_id: &str,
996 new_pipeline: BoxProcessor,
997 ) -> Result<(), CamelError> {
998 let managed = self
999 .routes
1000 .get(route_id)
1001 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1002
1003 managed
1004 .pipeline
1005 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1006 info!(route_id = %route_id, "Pipeline swapped atomically");
1007 Ok(())
1008 }
1009
1010 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1012 self.routes.get(route_id).map(|r| r.from_uri.clone())
1013 }
1014
1015 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1020 self.routes
1021 .get(route_id)
1022 .map(|r| r.pipeline.load().0.clone())
1023 }
1024
1025 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1027 let managed = self
1028 .routes
1029 .get_mut(route_id)
1030 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1031
1032 if !handle_is_running(&managed.consumer_handle)
1033 && !handle_is_running(&managed.pipeline_handle)
1034 {
1035 return Ok(());
1036 }
1037
1038 info!(route_id = %route_id, "Stopping route");
1039
1040 let managed = self
1042 .routes
1043 .get_mut(route_id)
1044 .expect("invariant: route must exist after prior existence check");
1045 managed.consumer_cancel_token.cancel();
1046 managed.pipeline_cancel_token.cancel();
1047
1048 let managed = self
1050 .routes
1051 .get_mut(route_id)
1052 .expect("invariant: route must exist after prior existence check");
1053 let consumer_handle = managed.consumer_handle.take();
1054 let pipeline_handle = managed.pipeline_handle.take();
1055
1056 let managed = self
1059 .routes
1060 .get_mut(route_id)
1061 .expect("invariant: route must exist after prior existence check");
1062 managed.channel_sender = None;
1063
1064 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1068 match (consumer_handle, pipeline_handle) {
1069 (Some(c), Some(p)) => {
1070 let _ = tokio::join!(c, p);
1071 }
1072 (Some(c), None) => {
1073 let _ = c.await;
1074 }
1075 (None, Some(p)) => {
1076 let _ = p.await;
1077 }
1078 (None, None) => {}
1079 }
1080 })
1081 .await;
1082
1083 if timeout_result.is_err() {
1084 warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
1085 }
1086
1087 let managed = self
1089 .routes
1090 .get_mut(route_id)
1091 .expect("invariant: route must exist after prior existence check");
1092
1093 managed.consumer_cancel_token = CancellationToken::new();
1095 managed.pipeline_cancel_token = CancellationToken::new();
1096
1097 info!(route_id = %route_id, "Route stopped");
1098 Ok(())
1099 }
1100}
1101
1102#[async_trait::async_trait]
1103impl RouteController for DefaultRouteController {
1104 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1105 {
1107 let managed = self
1108 .routes
1109 .get_mut(route_id)
1110 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1111
1112 let consumer_running = handle_is_running(&managed.consumer_handle);
1113 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1114 if consumer_running && pipeline_running {
1115 return Ok(());
1116 }
1117 if !consumer_running && pipeline_running {
1118 return Err(CamelError::RouteError(format!(
1119 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1120 route_id
1121 )));
1122 }
1123 if consumer_running && !pipeline_running {
1124 return Err(CamelError::RouteError(format!(
1125 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1126 route_id
1127 )));
1128 }
1129 }
1130
1131 info!(route_id = %route_id, "Starting route");
1132
1133 let (from_uri, pipeline, concurrency) = {
1135 let managed = self
1136 .routes
1137 .get(route_id)
1138 .expect("invariant: route must exist after prior existence check");
1139 (
1140 managed.from_uri.clone(),
1141 Arc::clone(&managed.pipeline),
1142 managed.concurrency.clone(),
1143 )
1144 };
1145
1146 let crash_notifier = self.crash_notifier.clone();
1148 let runtime_for_consumer = self.runtime.clone();
1149
1150 let parsed = parse_uri(&from_uri)?;
1152 let registry = self
1153 .registry
1154 .lock()
1155 .expect("mutex poisoned: another thread panicked while holding this lock");
1156 let component = registry.get_or_err(&parsed.scheme)?;
1157 let endpoint = component.create_endpoint(&from_uri)?;
1158 let mut consumer = endpoint.create_consumer()?;
1159 let consumer_concurrency = consumer.concurrency_model();
1160 drop(registry);
1162
1163 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1165
1166 let managed = self
1168 .routes
1169 .get_mut(route_id)
1170 .expect("invariant: route must exist after prior existence check");
1171
1172 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1174 let consumer_cancel = managed.consumer_cancel_token.child_token();
1176 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1177 let tx_for_storage = tx.clone();
1179 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1180
1181 let route_id_for_consumer = route_id.to_string();
1183 let consumer_handle = tokio::spawn(async move {
1184 if let Err(e) = consumer.start(consumer_ctx).await {
1185 error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1186 let error_msg = e.to_string();
1187
1188 if let Some(tx) = crash_notifier {
1190 let _ = tx
1191 .send(CrashNotification {
1192 route_id: route_id_for_consumer.clone(),
1193 error: error_msg.clone(),
1194 })
1195 .await;
1196 }
1197
1198 publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1199 .await;
1200 }
1201 });
1202
1203 let pipeline_handle = match effective_concurrency {
1205 ConcurrencyModel::Sequential => {
1206 tokio::spawn(async move {
1207 loop {
1208 let envelope = tokio::select! {
1210 envelope = rx.recv() => match envelope {
1211 Some(e) => e,
1212 None => return, },
1214 _ = pipeline_cancel.cancelled() => {
1215 return;
1217 }
1218 };
1219 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1220
1221 let mut pipeline = pipeline.load().0.clone();
1223
1224 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1225 if let Some(tx) = reply_tx {
1226 let _ = tx.send(Err(e));
1227 }
1228 return;
1229 }
1230
1231 let result = pipeline.call(exchange).await;
1232 if let Some(tx) = reply_tx {
1233 let _ = tx.send(result);
1234 } else if let Err(ref e) = result
1235 && !matches!(e, CamelError::Stopped)
1236 {
1237 error!("Pipeline error: {e}");
1238 }
1239 }
1240 })
1241 }
1242 ConcurrencyModel::Concurrent { max } => {
1243 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1244 tokio::spawn(async move {
1245 loop {
1246 let envelope = tokio::select! {
1248 envelope = rx.recv() => match envelope {
1249 Some(e) => e,
1250 None => return, },
1252 _ = pipeline_cancel.cancelled() => {
1253 return;
1255 }
1256 };
1257 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1258 let pipe_ref = Arc::clone(&pipeline);
1259 let sem = sem.clone();
1260 let cancel = pipeline_cancel.clone();
1261 tokio::spawn(async move {
1262 let _permit = match &sem {
1264 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1265 None => None,
1266 };
1267
1268 let mut pipe = pipe_ref.load().0.clone();
1270
1271 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1273 if let Some(tx) = reply_tx {
1274 let _ = tx.send(Err(e));
1275 }
1276 return;
1277 }
1278
1279 let result = pipe.call(exchange).await;
1280 if let Some(tx) = reply_tx {
1281 let _ = tx.send(result);
1282 } else if let Err(ref e) = result
1283 && !matches!(e, CamelError::Stopped)
1284 {
1285 error!("Pipeline error: {e}");
1286 }
1287 });
1288 }
1289 })
1290 }
1291 };
1292
1293 let managed = self
1295 .routes
1296 .get_mut(route_id)
1297 .expect("invariant: route must exist after prior existence check");
1298 managed.consumer_handle = Some(consumer_handle);
1299 managed.pipeline_handle = Some(pipeline_handle);
1300 managed.channel_sender = Some(tx_for_storage);
1301
1302 info!(route_id = %route_id, "Route started");
1303 Ok(())
1304 }
1305
1306 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1307 self.stop_route_internal(route_id).await
1308 }
1309
1310 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1311 self.stop_route(route_id).await?;
1312 tokio::time::sleep(Duration::from_millis(100)).await;
1313 self.start_route(route_id).await
1314 }
1315
1316 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1317 let managed = self
1319 .routes
1320 .get_mut(route_id)
1321 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1322
1323 let consumer_running = handle_is_running(&managed.consumer_handle);
1324 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1325
1326 if !consumer_running || !pipeline_running {
1328 return Err(CamelError::RouteError(format!(
1329 "Cannot suspend route '{}' with execution lifecycle {}",
1330 route_id,
1331 inferred_lifecycle_label(managed)
1332 )));
1333 }
1334
1335 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1336
1337 let managed = self
1339 .routes
1340 .get_mut(route_id)
1341 .expect("invariant: route must exist after prior existence check");
1342 managed.consumer_cancel_token.cancel();
1343
1344 let managed = self
1346 .routes
1347 .get_mut(route_id)
1348 .expect("invariant: route must exist after prior existence check");
1349 let consumer_handle = managed.consumer_handle.take();
1350
1351 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1353 if let Some(handle) = consumer_handle {
1354 let _ = handle.await;
1355 }
1356 })
1357 .await;
1358
1359 if timeout_result.is_err() {
1360 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1361 }
1362
1363 let managed = self
1365 .routes
1366 .get_mut(route_id)
1367 .expect("invariant: route must exist after prior existence check");
1368
1369 managed.consumer_cancel_token = CancellationToken::new();
1371
1372 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1373 Ok(())
1374 }
1375
1376 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1377 let managed = self
1379 .routes
1380 .get(route_id)
1381 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1382
1383 let consumer_running = handle_is_running(&managed.consumer_handle);
1384 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1385 if consumer_running || !pipeline_running {
1386 return Err(CamelError::RouteError(format!(
1387 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1388 route_id,
1389 inferred_lifecycle_label(managed)
1390 )));
1391 }
1392
1393 let sender = managed.channel_sender.clone().ok_or_else(|| {
1395 CamelError::RouteError("Suspended route has no channel sender".into())
1396 })?;
1397
1398 let from_uri = managed.from_uri.clone();
1400
1401 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1402
1403 let parsed = parse_uri(&from_uri)?;
1405 let registry = self
1406 .registry
1407 .lock()
1408 .expect("mutex poisoned: another thread panicked while holding this lock");
1409 let component = registry.get_or_err(&parsed.scheme)?;
1410 let endpoint = component.create_endpoint(&from_uri)?;
1411 let mut consumer = endpoint.create_consumer()?;
1412 drop(registry);
1414
1415 let managed = self
1417 .routes
1418 .get_mut(route_id)
1419 .expect("invariant: route must exist after prior existence check");
1420
1421 let consumer_cancel = managed.consumer_cancel_token.child_token();
1423
1424 let crash_notifier = self.crash_notifier.clone();
1425 let runtime_for_consumer = self.runtime.clone();
1426
1427 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1429
1430 let route_id_for_consumer = route_id.to_string();
1432 let consumer_handle = tokio::spawn(async move {
1433 if let Err(e) = consumer.start(consumer_ctx).await {
1434 error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1435 let error_msg = e.to_string();
1436
1437 if let Some(tx) = crash_notifier {
1439 let _ = tx
1440 .send(CrashNotification {
1441 route_id: route_id_for_consumer.clone(),
1442 error: error_msg.clone(),
1443 })
1444 .await;
1445 }
1446
1447 publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1448 .await;
1449 }
1450 });
1451
1452 let managed = self
1454 .routes
1455 .get_mut(route_id)
1456 .expect("invariant: route must exist after prior existence check");
1457 managed.consumer_handle = Some(consumer_handle);
1458
1459 info!(route_id = %route_id, "Route resumed");
1460 Ok(())
1461 }
1462
1463 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1464 let route_ids: Vec<String> = {
1467 let mut pairs: Vec<_> = self
1468 .routes
1469 .iter()
1470 .filter(|(_, r)| r.definition.auto_startup())
1471 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1472 .collect();
1473 pairs.sort_by_key(|(_, order)| *order);
1474 pairs.into_iter().map(|(id, _)| id).collect()
1475 };
1476
1477 info!("Starting {} auto-startup routes", route_ids.len());
1478
1479 let mut errors: Vec<String> = Vec::new();
1481 for route_id in route_ids {
1482 if let Err(e) = self.start_route(&route_id).await {
1483 errors.push(format!("Route '{}': {}", route_id, e));
1484 }
1485 }
1486
1487 if !errors.is_empty() {
1488 return Err(CamelError::RouteError(format!(
1489 "Failed to start routes: {}",
1490 errors.join(", ")
1491 )));
1492 }
1493
1494 info!("All auto-startup routes started");
1495 Ok(())
1496 }
1497
1498 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1499 let route_ids: Vec<String> = {
1501 let mut pairs: Vec<_> = self
1502 .routes
1503 .iter()
1504 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1505 .collect();
1506 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1507 pairs.into_iter().map(|(id, _)| id).collect()
1508 };
1509
1510 info!("Stopping {} routes", route_ids.len());
1511
1512 for route_id in route_ids {
1513 let _ = self.stop_route(&route_id).await;
1514 }
1515
1516 info!("All routes stopped");
1517 Ok(())
1518 }
1519}
1520
1521#[async_trait::async_trait]
1522impl RouteControllerInternal for DefaultRouteController {
1523 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
1524 DefaultRouteController::add_route(self, def)
1525 }
1526
1527 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
1528 DefaultRouteController::swap_pipeline(self, route_id, pipeline)
1529 }
1530
1531 fn route_from_uri(&self, route_id: &str) -> Option<String> {
1532 DefaultRouteController::route_from_uri(self, route_id)
1534 }
1535
1536 fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
1537 DefaultRouteController::set_error_handler(self, config)
1538 }
1539
1540 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
1541 DefaultRouteController::set_self_ref(self, self_ref)
1542 }
1543
1544 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
1545 DefaultRouteController::set_runtime_handle(self, runtime)
1546 }
1547
1548 fn route_count(&self) -> usize {
1549 DefaultRouteController::route_count(self)
1550 }
1551
1552 fn route_ids(&self) -> Vec<String> {
1553 DefaultRouteController::route_ids(self)
1554 }
1555
1556 fn auto_startup_route_ids(&self) -> Vec<String> {
1557 DefaultRouteController::auto_startup_route_ids(self)
1558 }
1559
1560 fn shutdown_route_ids(&self) -> Vec<String> {
1561 DefaultRouteController::shutdown_route_ids(self)
1562 }
1563
1564 fn set_tracer_config(&mut self, config: &TracerConfig) {
1565 DefaultRouteController::set_tracer_config(self, config)
1566 }
1567
1568 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
1569 DefaultRouteController::compile_route_definition(self, def)
1570 }
1571
1572 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1573 DefaultRouteController::remove_route(self, route_id)
1574 }
1575
1576 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1577 DefaultRouteController::start_route(self, route_id).await
1578 }
1579
1580 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1581 DefaultRouteController::stop_route(self, route_id).await
1582 }
1583}
1584
1585#[cfg(test)]
1586mod tests {
1587 use super::*;
1588
1589 #[test]
1590 fn test_route_controller_internal_is_object_safe() {
1591 let _: Option<Box<dyn RouteControllerInternal>> = None;
1592 }
1593}