1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::error_handler::ErrorHandlerConfig;
18use camel_api::metrics::MetricsCollector;
19use camel_api::{
20 BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
21 RouteController, RuntimeCommand, RuntimeHandle, Value, body::Body,
22};
23use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
24use camel_endpoint::parse_uri;
25use camel_language_api::{Expression, Language, LanguageError, Predicate};
26use camel_processor::circuit_breaker::CircuitBreakerLayer;
27use camel_processor::error_handler::ErrorHandlerLayer;
28use camel_processor::script_mutator::ScriptMutator;
29use camel_processor::{ChoiceService, WhenClause};
30
31use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
32use crate::lifecycle::adapters::route_compiler::{
33 compose_pipeline, compose_traced_pipeline_with_contracts,
34};
35use crate::lifecycle::application::route_definition::{
36 BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
37};
38use crate::shared::components::domain::Registry;
39use crate::shared::observability::domain::{DetailLevel, TracerConfig};
40use arc_swap::ArcSwap;
41use camel_bean::BeanRegistry;
42
43#[derive(Debug, Clone)]
48pub struct CrashNotification {
49 pub route_id: String,
51 pub error: String,
53}
54
55pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
70unsafe impl Sync for SyncBoxProcessor {}
71
72type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
73pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
74
75#[async_trait::async_trait]
81pub trait RouteControllerInternal: RouteController + Send {
82 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
84
85 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
87
88 fn route_from_uri(&self, route_id: &str) -> Option<String>;
90
91 fn set_error_handler(&mut self, config: ErrorHandlerConfig);
93
94 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
96
97 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>);
99
100 fn route_count(&self) -> usize;
102
103 fn in_flight_count(&self, route_id: &str) -> Option<u64>;
105
106 fn route_exists(&self, route_id: &str) -> bool;
108
109 fn route_ids(&self) -> Vec<String>;
111
112 fn auto_startup_route_ids(&self) -> Vec<String>;
114
115 fn shutdown_route_ids(&self) -> Vec<String>;
117
118 fn set_tracer_config(&mut self, config: &TracerConfig);
120
121 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
124
125 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
127
128 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
130
131 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
133}
134
135struct ManagedRoute {
137 definition: RouteDefinitionInfo,
139 from_uri: String,
141 pipeline: SharedPipeline,
143 concurrency: Option<ConcurrencyModel>,
145 consumer_handle: Option<JoinHandle<()>>,
147 pipeline_handle: Option<JoinHandle<()>>,
149 consumer_cancel_token: CancellationToken,
152 pipeline_cancel_token: CancellationToken,
155 channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
158 in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
160}
161
162fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
163 handle.as_ref().is_some_and(|h| !h.is_finished())
164}
165
166fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
167 match (
168 handle_is_running(&managed.consumer_handle),
169 handle_is_running(&managed.pipeline_handle),
170 ) {
171 (true, true) => "Started",
172 (false, true) => "Suspended",
173 (true, false) => "Stopping",
174 (false, false) => "Stopped",
175 }
176}
177
178async fn ready_with_backoff(
185 pipeline: &mut BoxProcessor,
186 cancel: &CancellationToken,
187) -> Result<(), CamelError> {
188 loop {
189 match pipeline.ready().await {
190 Ok(_) => return Ok(()),
191 Err(CamelError::CircuitOpen(ref msg)) => {
192 warn!("Circuit open, backing off: {msg}");
193 tokio::select! {
194 _ = tokio::time::sleep(Duration::from_secs(1)) => {
195 continue;
196 }
197 _ = cancel.cancelled() => {
198 return Err(CamelError::CircuitOpen(msg.clone()));
200 }
201 }
202 }
203 Err(e) => {
204 error!("Pipeline not ready: {e}");
205 return Err(e);
206 }
207 }
208 }
209}
210
211fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
212 let stamp = std::time::SystemTime::now()
213 .duration_since(std::time::UNIX_EPOCH)
214 .unwrap_or_default()
215 .as_nanos();
216 RuntimeCommand::FailRoute {
217 route_id: route_id.to_string(),
218 error: error.to_string(),
219 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
220 causation_id: None,
221 }
222}
223
224async fn publish_runtime_failure(
225 runtime: Option<Weak<dyn RuntimeHandle>>,
226 route_id: &str,
227 error: &str,
228) {
229 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
230 return;
231 };
232 let command = runtime_failure_command(route_id, error);
233 if let Err(runtime_error) = runtime.execute(command).await {
234 warn!(
235 route_id = %route_id,
236 error = %runtime_error,
237 "failed to synchronize route crash with runtime projection"
238 );
239 }
240}
241
242pub struct DefaultRouteController {
250 routes: HashMap<String, ManagedRoute>,
252 registry: Arc<std::sync::Mutex<Registry>>,
254 languages: SharedLanguageRegistry,
256 beans: Arc<std::sync::Mutex<BeanRegistry>>,
258 self_ref: Option<Arc<Mutex<dyn RouteController>>>,
261 runtime: Option<Weak<dyn RuntimeHandle>>,
263 global_error_handler: Option<ErrorHandlerConfig>,
265 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
267 tracing_enabled: bool,
269 tracer_detail_level: DetailLevel,
271 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
273}
274
275impl DefaultRouteController {
276 pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
278 Self::with_beans(
279 registry,
280 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
281 )
282 }
283
284 pub fn with_beans(
286 registry: Arc<std::sync::Mutex<Registry>>,
287 beans: Arc<std::sync::Mutex<BeanRegistry>>,
288 ) -> Self {
289 Self {
290 routes: HashMap::new(),
291 registry,
292 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
293 beans,
294 self_ref: None,
295 runtime: None,
296 global_error_handler: None,
297 crash_notifier: None,
298 tracing_enabled: false,
299 tracer_detail_level: DetailLevel::Minimal,
300 tracer_metrics: None,
301 }
302 }
303
304 pub fn with_languages(
306 registry: Arc<std::sync::Mutex<Registry>>,
307 languages: SharedLanguageRegistry,
308 ) -> Self {
309 Self {
310 routes: HashMap::new(),
311 registry,
312 languages,
313 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
314 self_ref: None,
315 runtime: None,
316 global_error_handler: None,
317 crash_notifier: None,
318 tracing_enabled: false,
319 tracer_detail_level: DetailLevel::Minimal,
320 tracer_metrics: None,
321 }
322 }
323
324 pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
328 self.self_ref = Some(self_ref);
329 }
330
331 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
333 self.runtime = Some(Arc::downgrade(&runtime));
334 }
335
336 pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
341 self.self_ref.clone()
342 }
343
344 pub fn runtime_handle_for_supervision(&self) -> Option<Arc<dyn RuntimeHandle>> {
346 self.runtime.as_ref().and_then(Weak::upgrade)
347 }
348
349 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
354 self.crash_notifier = Some(tx);
355 }
356
357 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
359 self.global_error_handler = Some(config);
360 }
361
362 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
364 self.tracing_enabled = config.enabled;
365 self.tracer_detail_level = config.detail_level.clone();
366 self.tracer_metrics = config.metrics_collector.clone();
367 }
368
369 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
370 let mut producer_ctx = ProducerContext::new();
371 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
372 producer_ctx = producer_ctx.with_runtime(runtime);
373 }
374 Ok(producer_ctx)
375 }
376
377 fn resolve_error_handler(
379 &self,
380 config: ErrorHandlerConfig,
381 producer_ctx: &ProducerContext,
382 registry: &Registry,
383 ) -> Result<ErrorHandlerLayer, CamelError> {
384 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
386 let parsed = parse_uri(uri)?;
387 let component = registry.get_or_err(&parsed.scheme)?;
388 let endpoint = component.create_endpoint(uri)?;
389 Some(endpoint.create_producer(producer_ctx)?)
390 } else {
391 None
392 };
393
394 let mut resolved_policies = Vec::new();
396 for policy in config.policies {
397 let handler_producer = if let Some(ref uri) = policy.handled_by {
398 let parsed = parse_uri(uri)?;
399 let component = registry.get_or_err(&parsed.scheme)?;
400 let endpoint = component.create_endpoint(uri)?;
401 Some(endpoint.create_producer(producer_ctx)?)
402 } else {
403 None
404 };
405 resolved_policies.push((policy, handler_producer));
406 }
407
408 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
409 }
410
411 fn resolve_uow_layer(
414 &self,
415 config: &UnitOfWorkConfig,
416 producer_ctx: &ProducerContext,
417 registry: &Registry,
418 counter: Option<Arc<AtomicU64>>,
419 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
420 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
421 let parsed = parse_uri(uri)?;
422 let component = registry.get_or_err(&parsed.scheme)?;
423 let endpoint = component.create_endpoint(uri)?;
424 endpoint.create_producer(producer_ctx).map_err(|e| {
425 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
426 })
427 };
428
429 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
430 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
431
432 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
433 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
434 Ok((layer, counter))
435 }
436
437 fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
438 let guard = self
439 .languages
440 .lock()
441 .expect("mutex poisoned: another thread panicked while holding this lock");
442 guard.get(language).cloned().ok_or_else(|| {
443 CamelError::RouteError(format!(
444 "language `{language}` is not registered in CamelContext"
445 ))
446 })
447 }
448
449 fn compile_language_expression(
450 &self,
451 expression: &LanguageExpressionDef,
452 ) -> Result<Arc<dyn Expression>, CamelError> {
453 let language = self.resolve_language(&expression.language)?;
454 let compiled = language
455 .create_expression(&expression.source)
456 .map_err(|e| {
457 CamelError::RouteError(format!(
458 "failed to compile {} expression `{}`: {e}",
459 expression.language, expression.source
460 ))
461 })?;
462 Ok(Arc::from(compiled))
463 }
464
465 fn compile_language_predicate(
466 &self,
467 expression: &LanguageExpressionDef,
468 ) -> Result<Arc<dyn Predicate>, CamelError> {
469 let language = self.resolve_language(&expression.language)?;
470 let compiled = language.create_predicate(&expression.source).map_err(|e| {
471 CamelError::RouteError(format!(
472 "failed to compile {} predicate `{}`: {e}",
473 expression.language, expression.source
474 ))
475 })?;
476 Ok(Arc::from(compiled))
477 }
478
479 fn compile_filter_predicate(
480 &self,
481 expression: &LanguageExpressionDef,
482 ) -> Result<FilterPredicate, CamelError> {
483 let predicate = self.compile_language_predicate(expression)?;
484 Ok(Arc::new(move |exchange: &Exchange| {
485 predicate.matches(exchange).unwrap_or(false)
486 }))
487 }
488
489 fn value_to_body(value: Value) -> Body {
490 match value {
491 Value::Null => Body::Empty,
492 Value::String(text) => Body::Text(text),
493 other => Body::Json(other),
494 }
495 }
496
497 pub(crate) fn resolve_steps(
499 &self,
500 steps: Vec<BuilderStep>,
501 producer_ctx: &ProducerContext,
502 registry: &Arc<std::sync::Mutex<Registry>>,
503 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
504 let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
505 let parsed = parse_uri(uri)?;
506 let registry_guard = registry
507 .lock()
508 .expect("mutex poisoned: another thread panicked while holding this lock");
509 let component = registry_guard.get_or_err(&parsed.scheme)?;
510 let endpoint = component.create_endpoint(uri)?;
511 endpoint.create_producer(producer_ctx)
512 };
513
514 let mut processors: Vec<(BoxProcessor, Option<camel_api::BodyType>)> = Vec::new();
515 for step in steps {
516 match step {
517 BuilderStep::Processor(svc) => {
518 processors.push((svc, None));
519 }
520 BuilderStep::To(uri) => {
521 let parsed = parse_uri(&uri)?;
522 let registry_guard = registry
523 .lock()
524 .expect("mutex poisoned: another thread panicked while holding this lock");
525 let component = registry_guard.get_or_err(&parsed.scheme)?;
526 let endpoint = component.create_endpoint(&uri)?;
527 let contract = endpoint.body_contract();
528 let producer = endpoint.create_producer(producer_ctx)?;
529 processors.push((producer, contract));
530 }
531 BuilderStep::Stop => {
532 processors.push((BoxProcessor::new(camel_processor::StopService), None));
533 }
534 BuilderStep::Log { level, message } => {
535 let svc = camel_processor::LogProcessor::new(level, message);
536 processors.push((BoxProcessor::new(svc), None));
537 }
538 BuilderStep::DeclarativeSetHeader { key, value } => match value {
539 ValueSourceDef::Literal(value) => {
540 let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
541 processors.push((BoxProcessor::new(svc), None));
542 }
543 ValueSourceDef::Expression(expression) => {
544 let expression = self.compile_language_expression(&expression)?;
545 let svc = camel_processor::DynamicSetHeader::new(
546 IdentityProcessor,
547 key,
548 move |exchange: &Exchange| {
549 expression.evaluate(exchange).unwrap_or(Value::Null)
550 },
551 );
552 processors.push((BoxProcessor::new(svc), None));
553 }
554 },
555 BuilderStep::DeclarativeSetBody { value } => match value {
556 ValueSourceDef::Literal(value) => {
557 let body = Self::value_to_body(value);
558 let svc = camel_processor::SetBody::new(
559 IdentityProcessor,
560 move |_exchange: &Exchange| body.clone(),
561 );
562 processors.push((BoxProcessor::new(svc), None));
563 }
564 ValueSourceDef::Expression(expression) => {
565 let expression = self.compile_language_expression(&expression)?;
566 let svc = camel_processor::SetBody::new(
567 IdentityProcessor,
568 move |exchange: &Exchange| {
569 let value = expression.evaluate(exchange).unwrap_or(Value::Null);
570 Self::value_to_body(value)
571 },
572 );
573 processors.push((BoxProcessor::new(svc), None));
574 }
575 },
576 BuilderStep::DeclarativeFilter { predicate, steps } => {
577 let predicate = self.compile_filter_predicate(&predicate)?;
578 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
579 let sub_processors: Vec<BoxProcessor> =
580 sub_pairs.into_iter().map(|(p, _)| p).collect();
581 let sub_pipeline = compose_pipeline(sub_processors);
582 let svc =
583 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
584 processors.push((BoxProcessor::new(svc), None));
585 }
586 BuilderStep::DeclarativeChoice { whens, otherwise } => {
587 let mut when_clauses = Vec::new();
588 for when_step in whens {
589 let predicate = self.compile_filter_predicate(&when_step.predicate)?;
590 let sub_pairs =
591 self.resolve_steps(when_step.steps, producer_ctx, registry)?;
592 let sub_processors: Vec<BoxProcessor> =
593 sub_pairs.into_iter().map(|(p, _)| p).collect();
594 let pipeline = compose_pipeline(sub_processors);
595 when_clauses.push(WhenClause {
596 predicate,
597 pipeline,
598 });
599 }
600 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
601 let sub_pairs =
602 self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
603 let sub_processors: Vec<BoxProcessor> =
604 sub_pairs.into_iter().map(|(p, _)| p).collect();
605 Some(compose_pipeline(sub_processors))
606 } else {
607 None
608 };
609 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
610 processors.push((BoxProcessor::new(svc), None));
611 }
612 BuilderStep::DeclarativeScript { expression } => {
613 let lang = self.resolve_language(&expression.language)?;
614 match lang.create_mutating_expression(&expression.source) {
615 Ok(mut_expr) => {
616 processors
617 .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
618 }
619 Err(LanguageError::NotSupported { .. }) => {
620 let expression = self.compile_language_expression(&expression)?;
628 let svc = camel_processor::SetBody::new(
629 IdentityProcessor,
630 move |exchange: &Exchange| {
631 let value =
632 expression.evaluate(exchange).unwrap_or(Value::Null);
633 Self::value_to_body(value)
634 },
635 );
636 processors.push((BoxProcessor::new(svc), None));
637 }
638 Err(e) => {
639 return Err(CamelError::RouteError(format!(
640 "Failed to create mutating expression for language '{}': {}",
641 expression.language, e
642 )));
643 }
644 }
645 }
646 BuilderStep::Split { config, steps } => {
647 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
648 let sub_processors: Vec<BoxProcessor> =
649 sub_pairs.into_iter().map(|(p, _)| p).collect();
650 let sub_pipeline = compose_pipeline(sub_processors);
651 let splitter =
652 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
653 processors.push((BoxProcessor::new(splitter), None));
654 }
655 BuilderStep::DeclarativeSplit {
656 expression,
657 aggregation,
658 parallel,
659 parallel_limit,
660 stop_on_exception,
661 steps,
662 } => {
663 let lang_expr = self.compile_language_expression(&expression)?;
664 let split_fn = move |exchange: &Exchange| {
665 let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
666 match value {
667 Value::String(s) => s
668 .lines()
669 .filter(|line| !line.is_empty())
670 .map(|line| {
671 let mut fragment = exchange.clone();
672 fragment.input.body = Body::from(line.to_string());
673 fragment
674 })
675 .collect(),
676 Value::Array(arr) => arr
677 .into_iter()
678 .map(|v| {
679 let mut fragment = exchange.clone();
680 fragment.input.body = Body::from(v);
681 fragment
682 })
683 .collect(),
684 _ => vec![exchange.clone()],
685 }
686 };
687
688 let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
689 .aggregation(aggregation)
690 .parallel(parallel)
691 .stop_on_exception(stop_on_exception);
692 if let Some(limit) = parallel_limit {
693 config = config.parallel_limit(limit);
694 }
695
696 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
697 let sub_processors: Vec<BoxProcessor> =
698 sub_pairs.into_iter().map(|(p, _)| p).collect();
699 let sub_pipeline = compose_pipeline(sub_processors);
700 let splitter =
701 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
702 processors.push((BoxProcessor::new(splitter), None));
703 }
704 BuilderStep::Aggregate { config } => {
705 let svc = camel_processor::AggregatorService::new(config);
706 processors.push((BoxProcessor::new(svc), None));
707 }
708 BuilderStep::Filter { predicate, steps } => {
709 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
710 let sub_processors: Vec<BoxProcessor> =
711 sub_pairs.into_iter().map(|(p, _)| p).collect();
712 let sub_pipeline = compose_pipeline(sub_processors);
713 let svc =
714 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
715 processors.push((BoxProcessor::new(svc), None));
716 }
717 BuilderStep::Choice { whens, otherwise } => {
718 let mut when_clauses = Vec::new();
720 for when_step in whens {
721 let sub_pairs =
722 self.resolve_steps(when_step.steps, producer_ctx, registry)?;
723 let sub_processors: Vec<BoxProcessor> =
724 sub_pairs.into_iter().map(|(p, _)| p).collect();
725 let pipeline = compose_pipeline(sub_processors);
726 when_clauses.push(WhenClause {
727 predicate: when_step.predicate,
728 pipeline,
729 });
730 }
731 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
733 let sub_pairs =
734 self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
735 let sub_processors: Vec<BoxProcessor> =
736 sub_pairs.into_iter().map(|(p, _)| p).collect();
737 Some(compose_pipeline(sub_processors))
738 } else {
739 None
740 };
741 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
742 processors.push((BoxProcessor::new(svc), None));
743 }
744 BuilderStep::WireTap { uri } => {
745 let producer = resolve_producer(&uri)?;
746 let svc = camel_processor::WireTapService::new(producer);
747 processors.push((BoxProcessor::new(svc), None));
748 }
749 BuilderStep::Multicast { config, steps } => {
750 let mut endpoints = Vec::new();
752 for step in steps {
753 let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
754 let sub_processors: Vec<BoxProcessor> =
755 sub_pairs.into_iter().map(|(p, _)| p).collect();
756 let endpoint = compose_pipeline(sub_processors);
757 endpoints.push(endpoint);
758 }
759 let svc = camel_processor::MulticastService::new(endpoints, config);
760 processors.push((BoxProcessor::new(svc), None));
761 }
762 BuilderStep::DeclarativeLog { level, message } => {
763 let ValueSourceDef::Expression(expression) = message else {
764 unreachable!(
767 "DeclarativeLog with Literal should have been compiled to a Processor"
768 );
769 };
770 let expression = self.compile_language_expression(&expression)?;
771 let svc =
772 camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
773 expression
774 .evaluate(exchange)
775 .unwrap_or_else(|e| {
776 warn!(error = %e, "log expression evaluation failed");
777 Value::Null
778 })
779 .to_string()
780 });
781 processors.push((BoxProcessor::new(svc), None));
782 }
783 BuilderStep::Bean { name, method } => {
784 let beans = self.beans.lock().expect(
786 "beans mutex poisoned: another thread panicked while holding this lock",
787 );
788
789 let bean = beans.get(&name).ok_or_else(|| {
791 CamelError::ProcessorError(format!("Bean not found: {}", name))
792 })?;
793
794 let bean_clone = Arc::clone(&bean);
796 let method = method.clone();
797
798 let processor = tower::service_fn(move |mut exchange: Exchange| {
800 let bean = Arc::clone(&bean_clone);
801 let method = method.clone();
802
803 async move {
804 bean.call(&method, &mut exchange).await?;
805 Ok(exchange)
806 }
807 });
808
809 processors.push((BoxProcessor::new(processor), None));
810 }
811 BuilderStep::Script { language, script } => {
812 let lang = self.resolve_language(&language)?;
813 match lang.create_mutating_expression(&script) {
814 Ok(mut_expr) => {
815 processors
816 .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
817 }
818 Err(LanguageError::NotSupported {
819 feature,
820 language: ref lang_name,
821 }) => {
822 return Err(CamelError::RouteError(format!(
825 "Language '{}' does not support {} (required for .script() step)",
826 lang_name, feature
827 )));
828 }
829 Err(e) => {
830 return Err(CamelError::RouteError(format!(
831 "Failed to create mutating expression for language '{}': {}",
832 language, e
833 )));
834 }
835 }
836 }
837 BuilderStep::Throttle { config, steps } => {
838 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
839 let sub_processors: Vec<BoxProcessor> =
840 sub_pairs.into_iter().map(|(p, _)| p).collect();
841 let sub_pipeline = compose_pipeline(sub_processors);
842 let svc =
843 camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
844 processors.push((BoxProcessor::new(svc), None));
845 }
846 BuilderStep::LoadBalance { config, steps } => {
847 let mut endpoints = Vec::new();
849 for step in steps {
850 let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
851 let sub_processors: Vec<BoxProcessor> =
852 sub_pairs.into_iter().map(|(p, _)| p).collect();
853 let endpoint = compose_pipeline(sub_processors);
854 endpoints.push(endpoint);
855 }
856 let svc =
857 camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
858 processors.push((BoxProcessor::new(svc), None));
859 }
860 BuilderStep::DynamicRouter { config } => {
861 use camel_api::EndpointResolver;
862
863 let producer_ctx_clone = producer_ctx.clone();
864 let registry_clone = Arc::clone(registry);
865 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
866 let parsed = match parse_uri(uri) {
867 Ok(p) => p,
868 Err(_) => return None,
869 };
870 let registry_guard = match registry_clone.lock() {
871 Ok(g) => g,
872 Err(_) => return None, };
874 let component = match registry_guard.get_or_err(&parsed.scheme) {
875 Ok(c) => c,
876 Err(_) => return None,
877 };
878 let endpoint = match component.create_endpoint(uri) {
879 Ok(e) => e,
880 Err(_) => return None,
881 };
882 let producer = match endpoint.create_producer(&producer_ctx_clone) {
883 Ok(p) => p,
884 Err(_) => return None,
885 };
886 Some(BoxProcessor::new(producer))
887 });
888 let svc = camel_processor::dynamic_router::DynamicRouterService::new(
889 config, resolver,
890 );
891 processors.push((BoxProcessor::new(svc), None));
892 }
893 BuilderStep::RoutingSlip { config } => {
894 use camel_api::EndpointResolver;
895
896 let producer_ctx_clone = producer_ctx.clone();
897 let registry_clone = registry.clone();
898 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
899 let parsed = match parse_uri(uri) {
900 Ok(p) => p,
901 Err(_) => return None,
902 };
903 let registry_guard = match registry_clone.lock() {
904 Ok(g) => g,
905 Err(_) => return None,
906 };
907 let component = match registry_guard.get_or_err(&parsed.scheme) {
908 Ok(c) => c,
909 Err(_) => return None,
910 };
911 let endpoint = match component.create_endpoint(uri) {
912 Ok(e) => e,
913 Err(_) => return None,
914 };
915 let producer = match endpoint.create_producer(&producer_ctx_clone) {
916 Ok(p) => p,
917 Err(_) => return None,
918 };
919 Some(BoxProcessor::new(producer))
920 });
921
922 let svc =
923 camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
924 processors.push((BoxProcessor::new(svc), None));
925 }
926 }
927 }
928 Ok(processors)
929 }
930
931 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
941 let route_id = definition.route_id().to_string();
942
943 if self.routes.contains_key(&route_id) {
944 return Err(CamelError::RouteError(format!(
945 "Route '{}' already exists",
946 route_id
947 )));
948 }
949
950 info!(route_id = %route_id, "Adding route to controller");
951
952 let definition_info = definition.to_info();
954 let from_uri = definition.from_uri.to_string();
955 let concurrency = definition.concurrency;
956
957 let producer_ctx = self.build_producer_context()?;
959
960 let processors_with_contracts =
962 self.resolve_steps(definition.steps, &producer_ctx, &self.registry)?;
963 let route_id_for_tracing = route_id.clone();
964 let mut pipeline = compose_traced_pipeline_with_contracts(
965 processors_with_contracts,
966 &route_id_for_tracing,
967 self.tracing_enabled,
968 self.tracer_detail_level.clone(),
969 self.tracer_metrics.clone(),
970 );
971
972 if let Some(cb_config) = definition.circuit_breaker {
974 let cb_layer = CircuitBreakerLayer::new(cb_config);
975 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
976 }
977
978 let eh_config = definition
980 .error_handler
981 .or_else(|| self.global_error_handler.clone());
982
983 if let Some(config) = eh_config {
984 let registry = self
986 .registry
987 .lock()
988 .expect("mutex poisoned: another thread panicked while holding this lock");
989 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
990 pipeline = BoxProcessor::new(layer.layer(pipeline));
991 }
992
993 let uow_counter = if let Some(uow_config) = &definition.unit_of_work {
995 let registry = self
996 .registry
997 .lock()
998 .expect("mutex poisoned: registry lock in add_route uow");
999 let (uow_layer, counter) =
1000 self.resolve_uow_layer(uow_config, &producer_ctx, ®istry, None)?;
1001 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1002 Some(counter)
1003 } else {
1004 None
1005 };
1006
1007 self.routes.insert(
1008 route_id.clone(),
1009 ManagedRoute {
1010 definition: definition_info,
1011 from_uri,
1012 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
1013 concurrency,
1014 consumer_handle: None,
1015 pipeline_handle: None,
1016 consumer_cancel_token: CancellationToken::new(),
1017 pipeline_cancel_token: CancellationToken::new(),
1018 channel_sender: None,
1019 in_flight: uow_counter,
1020 },
1021 );
1022
1023 Ok(())
1024 }
1025
1026 pub fn compile_route_definition(
1031 &self,
1032 def: RouteDefinition,
1033 ) -> Result<BoxProcessor, CamelError> {
1034 let route_id = def.route_id().to_string();
1035
1036 let producer_ctx = self.build_producer_context()?;
1037
1038 let processors_with_contracts =
1039 self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
1040 let mut pipeline = compose_traced_pipeline_with_contracts(
1041 processors_with_contracts,
1042 &route_id,
1043 self.tracing_enabled,
1044 self.tracer_detail_level.clone(),
1045 self.tracer_metrics.clone(),
1046 );
1047
1048 if let Some(cb_config) = def.circuit_breaker {
1049 let cb_layer = CircuitBreakerLayer::new(cb_config);
1050 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1051 }
1052
1053 let eh_config = def
1054 .error_handler
1055 .clone()
1056 .or_else(|| self.global_error_handler.clone());
1057 if let Some(config) = eh_config {
1058 let registry = self
1060 .registry
1061 .lock()
1062 .expect("mutex poisoned: registry lock in compile_route_definition");
1063 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
1064 pipeline = BoxProcessor::new(layer.layer(pipeline));
1065 }
1066
1067 if let Some(uow_config) = &def.unit_of_work {
1069 let existing_counter = self
1070 .routes
1071 .get(&route_id)
1072 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1073
1074 let registry = self
1075 .registry
1076 .lock()
1077 .expect("mutex poisoned: registry lock in compile_route_definition uow");
1078
1079 let (uow_layer, _counter) =
1080 self.resolve_uow_layer(uow_config, &producer_ctx, ®istry, existing_counter)?;
1081
1082 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1083 }
1084
1085 Ok(pipeline)
1086 }
1087
1088 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1094 let managed = self.routes.get(route_id).ok_or_else(|| {
1095 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1096 })?;
1097 if handle_is_running(&managed.consumer_handle)
1098 || handle_is_running(&managed.pipeline_handle)
1099 {
1100 return Err(CamelError::RouteError(format!(
1101 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1102 route_id,
1103 inferred_lifecycle_label(managed)
1104 )));
1105 }
1106 self.routes.remove(route_id);
1107 info!(route_id = %route_id, "Route removed from controller");
1108 Ok(())
1109 }
1110
1111 pub fn route_count(&self) -> usize {
1113 self.routes.len()
1114 }
1115
1116 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1117 self.routes.get(route_id).map(|r| {
1118 r.in_flight
1119 .as_ref()
1120 .map_or(0, |c| c.load(Ordering::Relaxed))
1121 })
1122 }
1123
1124 pub fn route_exists(&self, route_id: &str) -> bool {
1126 self.routes.contains_key(route_id)
1127 }
1128
1129 pub fn route_ids(&self) -> Vec<String> {
1131 self.routes.keys().cloned().collect()
1132 }
1133
1134 pub fn auto_startup_route_ids(&self) -> Vec<String> {
1136 let mut pairs: Vec<(String, i32)> = self
1137 .routes
1138 .iter()
1139 .filter(|(_, managed)| managed.definition.auto_startup())
1140 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1141 .collect();
1142 pairs.sort_by_key(|(_, order)| *order);
1143 pairs.into_iter().map(|(id, _)| id).collect()
1144 }
1145
1146 pub fn shutdown_route_ids(&self) -> Vec<String> {
1148 let mut pairs: Vec<(String, i32)> = self
1149 .routes
1150 .iter()
1151 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1152 .collect();
1153 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1154 pairs.into_iter().map(|(id, _)| id).collect()
1155 }
1156
1157 pub fn swap_pipeline(
1162 &self,
1163 route_id: &str,
1164 new_pipeline: BoxProcessor,
1165 ) -> Result<(), CamelError> {
1166 let managed = self
1167 .routes
1168 .get(route_id)
1169 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1170
1171 managed
1172 .pipeline
1173 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1174 info!(route_id = %route_id, "Pipeline swapped atomically");
1175 Ok(())
1176 }
1177
1178 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1180 self.routes.get(route_id).map(|r| r.from_uri.clone())
1181 }
1182
1183 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1188 self.routes
1189 .get(route_id)
1190 .map(|r| r.pipeline.load().0.clone())
1191 }
1192
1193 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1195 let managed = self
1196 .routes
1197 .get_mut(route_id)
1198 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1199
1200 if !handle_is_running(&managed.consumer_handle)
1201 && !handle_is_running(&managed.pipeline_handle)
1202 {
1203 return Ok(());
1204 }
1205
1206 info!(route_id = %route_id, "Stopping route");
1207
1208 let managed = self
1210 .routes
1211 .get_mut(route_id)
1212 .expect("invariant: route must exist after prior existence check");
1213 managed.consumer_cancel_token.cancel();
1214 managed.pipeline_cancel_token.cancel();
1215
1216 let managed = self
1218 .routes
1219 .get_mut(route_id)
1220 .expect("invariant: route must exist after prior existence check");
1221 let consumer_handle = managed.consumer_handle.take();
1222 let pipeline_handle = managed.pipeline_handle.take();
1223
1224 let managed = self
1227 .routes
1228 .get_mut(route_id)
1229 .expect("invariant: route must exist after prior existence check");
1230 managed.channel_sender = None;
1231
1232 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1236 match (consumer_handle, pipeline_handle) {
1237 (Some(c), Some(p)) => {
1238 let _ = tokio::join!(c, p);
1239 }
1240 (Some(c), None) => {
1241 let _ = c.await;
1242 }
1243 (None, Some(p)) => {
1244 let _ = p.await;
1245 }
1246 (None, None) => {}
1247 }
1248 })
1249 .await;
1250
1251 if timeout_result.is_err() {
1252 warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
1253 }
1254
1255 let managed = self
1257 .routes
1258 .get_mut(route_id)
1259 .expect("invariant: route must exist after prior existence check");
1260
1261 managed.consumer_cancel_token = CancellationToken::new();
1263 managed.pipeline_cancel_token = CancellationToken::new();
1264
1265 info!(route_id = %route_id, "Route stopped");
1266 Ok(())
1267 }
1268}
1269
1270#[async_trait::async_trait]
1271impl RouteController for DefaultRouteController {
1272 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1273 {
1275 let managed = self
1276 .routes
1277 .get_mut(route_id)
1278 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1279
1280 let consumer_running = handle_is_running(&managed.consumer_handle);
1281 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1282 if consumer_running && pipeline_running {
1283 return Ok(());
1284 }
1285 if !consumer_running && pipeline_running {
1286 return Err(CamelError::RouteError(format!(
1287 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1288 route_id
1289 )));
1290 }
1291 if consumer_running && !pipeline_running {
1292 return Err(CamelError::RouteError(format!(
1293 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1294 route_id
1295 )));
1296 }
1297 }
1298
1299 info!(route_id = %route_id, "Starting route");
1300
1301 let (from_uri, pipeline, concurrency) = {
1303 let managed = self
1304 .routes
1305 .get(route_id)
1306 .expect("invariant: route must exist after prior existence check");
1307 (
1308 managed.from_uri.clone(),
1309 Arc::clone(&managed.pipeline),
1310 managed.concurrency.clone(),
1311 )
1312 };
1313
1314 let crash_notifier = self.crash_notifier.clone();
1316 let runtime_for_consumer = self.runtime.clone();
1317
1318 let parsed = parse_uri(&from_uri)?;
1320 let registry = self
1321 .registry
1322 .lock()
1323 .expect("mutex poisoned: another thread panicked while holding this lock");
1324 let component = registry.get_or_err(&parsed.scheme)?;
1325 let endpoint = component.create_endpoint(&from_uri)?;
1326 let mut consumer = endpoint.create_consumer()?;
1327 let consumer_concurrency = consumer.concurrency_model();
1328 drop(registry);
1330
1331 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1333
1334 let managed = self
1336 .routes
1337 .get_mut(route_id)
1338 .expect("invariant: route must exist after prior existence check");
1339
1340 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1342 let consumer_cancel = managed.consumer_cancel_token.child_token();
1344 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1345 let tx_for_storage = tx.clone();
1347 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1348
1349 let route_id_for_consumer = route_id.to_string();
1351 let consumer_handle = tokio::spawn(async move {
1352 if let Err(e) = consumer.start(consumer_ctx).await {
1353 error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1354 let error_msg = e.to_string();
1355
1356 if let Some(tx) = crash_notifier {
1358 let _ = tx
1359 .send(CrashNotification {
1360 route_id: route_id_for_consumer.clone(),
1361 error: error_msg.clone(),
1362 })
1363 .await;
1364 }
1365
1366 publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1367 .await;
1368 }
1369 });
1370
1371 let pipeline_handle = match effective_concurrency {
1373 ConcurrencyModel::Sequential => {
1374 tokio::spawn(async move {
1375 loop {
1376 let envelope = tokio::select! {
1378 envelope = rx.recv() => match envelope {
1379 Some(e) => e,
1380 None => return, },
1382 _ = pipeline_cancel.cancelled() => {
1383 return;
1385 }
1386 };
1387 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1388
1389 let mut pipeline = pipeline.load().0.clone();
1391
1392 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1393 if let Some(tx) = reply_tx {
1394 let _ = tx.send(Err(e));
1395 }
1396 return;
1397 }
1398
1399 let result = pipeline.call(exchange).await;
1400 if let Some(tx) = reply_tx {
1401 let _ = tx.send(result);
1402 } else if let Err(ref e) = result
1403 && !matches!(e, CamelError::Stopped)
1404 {
1405 error!("Pipeline error: {e}");
1406 }
1407 }
1408 })
1409 }
1410 ConcurrencyModel::Concurrent { max } => {
1411 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1412 tokio::spawn(async move {
1413 loop {
1414 let envelope = tokio::select! {
1416 envelope = rx.recv() => match envelope {
1417 Some(e) => e,
1418 None => return, },
1420 _ = pipeline_cancel.cancelled() => {
1421 return;
1423 }
1424 };
1425 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1426 let pipe_ref = Arc::clone(&pipeline);
1427 let sem = sem.clone();
1428 let cancel = pipeline_cancel.clone();
1429 tokio::spawn(async move {
1430 let _permit = match &sem {
1432 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1433 None => None,
1434 };
1435
1436 let mut pipe = pipe_ref.load().0.clone();
1438
1439 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1441 if let Some(tx) = reply_tx {
1442 let _ = tx.send(Err(e));
1443 }
1444 return;
1445 }
1446
1447 let result = pipe.call(exchange).await;
1448 if let Some(tx) = reply_tx {
1449 let _ = tx.send(result);
1450 } else if let Err(ref e) = result
1451 && !matches!(e, CamelError::Stopped)
1452 {
1453 error!("Pipeline error: {e}");
1454 }
1455 });
1456 }
1457 })
1458 }
1459 };
1460
1461 let managed = self
1463 .routes
1464 .get_mut(route_id)
1465 .expect("invariant: route must exist after prior existence check");
1466 managed.consumer_handle = Some(consumer_handle);
1467 managed.pipeline_handle = Some(pipeline_handle);
1468 managed.channel_sender = Some(tx_for_storage);
1469
1470 info!(route_id = %route_id, "Route started");
1471 Ok(())
1472 }
1473
1474 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1475 self.stop_route_internal(route_id).await
1476 }
1477
1478 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1479 self.stop_route(route_id).await?;
1480 tokio::time::sleep(Duration::from_millis(100)).await;
1481 self.start_route(route_id).await
1482 }
1483
1484 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1485 let managed = self
1487 .routes
1488 .get_mut(route_id)
1489 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1490
1491 let consumer_running = handle_is_running(&managed.consumer_handle);
1492 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1493
1494 if !consumer_running || !pipeline_running {
1496 return Err(CamelError::RouteError(format!(
1497 "Cannot suspend route '{}' with execution lifecycle {}",
1498 route_id,
1499 inferred_lifecycle_label(managed)
1500 )));
1501 }
1502
1503 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1504
1505 let managed = self
1507 .routes
1508 .get_mut(route_id)
1509 .expect("invariant: route must exist after prior existence check");
1510 managed.consumer_cancel_token.cancel();
1511
1512 let managed = self
1514 .routes
1515 .get_mut(route_id)
1516 .expect("invariant: route must exist after prior existence check");
1517 let consumer_handle = managed.consumer_handle.take();
1518
1519 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1521 if let Some(handle) = consumer_handle {
1522 let _ = handle.await;
1523 }
1524 })
1525 .await;
1526
1527 if timeout_result.is_err() {
1528 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1529 }
1530
1531 let managed = self
1533 .routes
1534 .get_mut(route_id)
1535 .expect("invariant: route must exist after prior existence check");
1536
1537 managed.consumer_cancel_token = CancellationToken::new();
1539
1540 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1541 Ok(())
1542 }
1543
1544 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1545 let managed = self
1547 .routes
1548 .get(route_id)
1549 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1550
1551 let consumer_running = handle_is_running(&managed.consumer_handle);
1552 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1553 if consumer_running || !pipeline_running {
1554 return Err(CamelError::RouteError(format!(
1555 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1556 route_id,
1557 inferred_lifecycle_label(managed)
1558 )));
1559 }
1560
1561 let sender = managed.channel_sender.clone().ok_or_else(|| {
1563 CamelError::RouteError("Suspended route has no channel sender".into())
1564 })?;
1565
1566 let from_uri = managed.from_uri.clone();
1568
1569 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1570
1571 let parsed = parse_uri(&from_uri)?;
1573 let registry = self
1574 .registry
1575 .lock()
1576 .expect("mutex poisoned: another thread panicked while holding this lock");
1577 let component = registry.get_or_err(&parsed.scheme)?;
1578 let endpoint = component.create_endpoint(&from_uri)?;
1579 let mut consumer = endpoint.create_consumer()?;
1580 drop(registry);
1582
1583 let managed = self
1585 .routes
1586 .get_mut(route_id)
1587 .expect("invariant: route must exist after prior existence check");
1588
1589 let consumer_cancel = managed.consumer_cancel_token.child_token();
1591
1592 let crash_notifier = self.crash_notifier.clone();
1593 let runtime_for_consumer = self.runtime.clone();
1594
1595 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1597
1598 let route_id_for_consumer = route_id.to_string();
1600 let consumer_handle = tokio::spawn(async move {
1601 if let Err(e) = consumer.start(consumer_ctx).await {
1602 error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1603 let error_msg = e.to_string();
1604
1605 if let Some(tx) = crash_notifier {
1607 let _ = tx
1608 .send(CrashNotification {
1609 route_id: route_id_for_consumer.clone(),
1610 error: error_msg.clone(),
1611 })
1612 .await;
1613 }
1614
1615 publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1616 .await;
1617 }
1618 });
1619
1620 let managed = self
1622 .routes
1623 .get_mut(route_id)
1624 .expect("invariant: route must exist after prior existence check");
1625 managed.consumer_handle = Some(consumer_handle);
1626
1627 info!(route_id = %route_id, "Route resumed");
1628 Ok(())
1629 }
1630
1631 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1632 let route_ids: Vec<String> = {
1635 let mut pairs: Vec<_> = self
1636 .routes
1637 .iter()
1638 .filter(|(_, r)| r.definition.auto_startup())
1639 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1640 .collect();
1641 pairs.sort_by_key(|(_, order)| *order);
1642 pairs.into_iter().map(|(id, _)| id).collect()
1643 };
1644
1645 info!("Starting {} auto-startup routes", route_ids.len());
1646
1647 let mut errors: Vec<String> = Vec::new();
1649 for route_id in route_ids {
1650 if let Err(e) = self.start_route(&route_id).await {
1651 errors.push(format!("Route '{}': {}", route_id, e));
1652 }
1653 }
1654
1655 if !errors.is_empty() {
1656 return Err(CamelError::RouteError(format!(
1657 "Failed to start routes: {}",
1658 errors.join(", ")
1659 )));
1660 }
1661
1662 info!("All auto-startup routes started");
1663 Ok(())
1664 }
1665
1666 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1667 let route_ids: Vec<String> = {
1669 let mut pairs: Vec<_> = self
1670 .routes
1671 .iter()
1672 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1673 .collect();
1674 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1675 pairs.into_iter().map(|(id, _)| id).collect()
1676 };
1677
1678 info!("Stopping {} routes", route_ids.len());
1679
1680 for route_id in route_ids {
1681 let _ = self.stop_route(&route_id).await;
1682 }
1683
1684 info!("All routes stopped");
1685 Ok(())
1686 }
1687}
1688
1689#[async_trait::async_trait]
1690impl RouteControllerInternal for DefaultRouteController {
1691 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
1692 DefaultRouteController::add_route(self, def)
1693 }
1694
1695 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
1696 DefaultRouteController::swap_pipeline(self, route_id, pipeline)
1697 }
1698
1699 fn route_from_uri(&self, route_id: &str) -> Option<String> {
1700 DefaultRouteController::route_from_uri(self, route_id)
1702 }
1703
1704 fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
1705 DefaultRouteController::set_error_handler(self, config)
1706 }
1707
1708 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
1709 DefaultRouteController::set_self_ref(self, self_ref)
1710 }
1711
1712 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
1713 DefaultRouteController::set_runtime_handle(self, runtime)
1714 }
1715
1716 fn route_count(&self) -> usize {
1717 DefaultRouteController::route_count(self)
1718 }
1719
1720 fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1721 DefaultRouteController::in_flight_count(self, route_id)
1722 }
1723
1724 fn route_exists(&self, route_id: &str) -> bool {
1725 DefaultRouteController::route_exists(self, route_id)
1726 }
1727
1728 fn route_ids(&self) -> Vec<String> {
1729 DefaultRouteController::route_ids(self)
1730 }
1731
1732 fn auto_startup_route_ids(&self) -> Vec<String> {
1733 DefaultRouteController::auto_startup_route_ids(self)
1734 }
1735
1736 fn shutdown_route_ids(&self) -> Vec<String> {
1737 DefaultRouteController::shutdown_route_ids(self)
1738 }
1739
1740 fn set_tracer_config(&mut self, config: &TracerConfig) {
1741 DefaultRouteController::set_tracer_config(self, config)
1742 }
1743
1744 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
1745 DefaultRouteController::compile_route_definition(self, def)
1746 }
1747
1748 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1749 DefaultRouteController::remove_route(self, route_id)
1750 }
1751
1752 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1753 DefaultRouteController::start_route(self, route_id).await
1754 }
1755
1756 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1757 DefaultRouteController::stop_route(self, route_id).await
1758 }
1759}
1760
1761#[cfg(test)]
1762mod tests {
1763 use super::*;
1764 use crate::shared::components::domain::Registry;
1765
1766 fn build_controller() -> DefaultRouteController {
1767 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())))
1768 }
1769
1770 fn build_controller_with_components() -> DefaultRouteController {
1771 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1772 {
1773 let mut guard = registry.lock().expect("registry lock");
1774 guard.register(camel_component_timer::TimerComponent::new());
1775 guard.register(camel_component_mock::MockComponent::new());
1776 guard.register(camel_component_log::LogComponent::new());
1777 }
1778 DefaultRouteController::new(registry)
1779 }
1780
1781 fn set_self_ref(controller: &mut DefaultRouteController) {
1782 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1783 let other: Arc<Mutex<dyn RouteController>> =
1784 Arc::new(Mutex::new(DefaultRouteController::new(registry)));
1785 controller.set_self_ref(other);
1786 }
1787
1788 fn register_simple_language(controller: &mut DefaultRouteController) {
1789 controller.languages.lock().expect("languages lock").insert(
1790 "simple".into(),
1791 Arc::new(camel_language_simple::SimpleLanguage),
1792 );
1793 }
1794
1795 #[test]
1796 fn test_route_controller_internal_is_object_safe() {
1797 let _: Option<Box<dyn RouteControllerInternal>> = None;
1798 }
1799
1800 #[test]
1801 fn helper_functions_cover_non_async_branches() {
1802 let managed = ManagedRoute {
1803 definition: RouteDefinition::new("timer:a", vec![])
1804 .with_route_id("r")
1805 .to_info(),
1806 from_uri: "timer:a".into(),
1807 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(BoxProcessor::new(
1808 IdentityProcessor,
1809 )))),
1810 concurrency: None,
1811 consumer_handle: None,
1812 pipeline_handle: None,
1813 consumer_cancel_token: CancellationToken::new(),
1814 pipeline_cancel_token: CancellationToken::new(),
1815 channel_sender: None,
1816 in_flight: None,
1817 };
1818
1819 assert_eq!(inferred_lifecycle_label(&managed), "Stopped");
1820 assert!(!handle_is_running(&managed.consumer_handle));
1821
1822 let cmd = runtime_failure_command("route-x", "boom");
1823 match cmd {
1824 RuntimeCommand::FailRoute {
1825 route_id, error, ..
1826 } => {
1827 assert_eq!(route_id, "route-x");
1828 assert_eq!(error, "boom");
1829 }
1830 _ => panic!("expected FailRoute command"),
1831 }
1832 }
1833
1834 #[test]
1835 fn add_route_detects_duplicates() {
1836 let mut controller = build_controller();
1837 set_self_ref(&mut controller);
1838
1839 controller
1840 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
1841 .expect("add route");
1842
1843 let dup_err = controller
1844 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
1845 .expect_err("duplicate must fail");
1846 assert!(dup_err.to_string().contains("already exists"));
1847 }
1848
1849 #[test]
1850 fn route_introspection_and_ordering_helpers_work() {
1851 let mut controller = build_controller();
1852 set_self_ref(&mut controller);
1853
1854 controller
1855 .add_route(
1856 RouteDefinition::new("timer:a", vec![])
1857 .with_route_id("a")
1858 .with_startup_order(20),
1859 )
1860 .unwrap();
1861 controller
1862 .add_route(
1863 RouteDefinition::new("timer:b", vec![])
1864 .with_route_id("b")
1865 .with_startup_order(10),
1866 )
1867 .unwrap();
1868 controller
1869 .add_route(
1870 RouteDefinition::new("timer:c", vec![])
1871 .with_route_id("c")
1872 .with_auto_startup(false)
1873 .with_startup_order(5),
1874 )
1875 .unwrap();
1876
1877 assert_eq!(controller.route_count(), 3);
1878 assert_eq!(controller.route_from_uri("a"), Some("timer:a".into()));
1879 assert!(controller.route_ids().contains(&"a".to_string()));
1880 assert_eq!(
1881 controller.auto_startup_route_ids(),
1882 vec!["b".to_string(), "a".to_string()]
1883 );
1884 assert_eq!(
1885 controller.shutdown_route_ids(),
1886 vec!["a".to_string(), "b".to_string(), "c".to_string()]
1887 );
1888 }
1889
1890 #[test]
1891 fn swap_pipeline_and_remove_route_behaviors() {
1892 let mut controller = build_controller();
1893 set_self_ref(&mut controller);
1894
1895 controller
1896 .add_route(RouteDefinition::new("timer:a", vec![]).with_route_id("swap"))
1897 .unwrap();
1898
1899 controller
1900 .swap_pipeline("swap", BoxProcessor::new(IdentityProcessor))
1901 .unwrap();
1902 assert!(controller.get_pipeline("swap").is_some());
1903
1904 controller.remove_route("swap").unwrap();
1905 assert_eq!(controller.route_count(), 0);
1906
1907 let err = controller
1908 .remove_route("swap")
1909 .expect_err("missing route must fail");
1910 assert!(err.to_string().contains("not found"));
1911 }
1912
1913 #[test]
1914 fn resolve_steps_covers_declarative_and_eip_variants() {
1915 use camel_api::LanguageExpressionDef;
1916 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1917
1918 let mut controller = build_controller_with_components();
1919 set_self_ref(&mut controller);
1920 register_simple_language(&mut controller);
1921
1922 let expr = |source: &str| LanguageExpressionDef {
1923 language: "simple".into(),
1924 source: source.into(),
1925 };
1926
1927 let steps = vec![
1928 BuilderStep::To("mock:out".into()),
1929 BuilderStep::Stop,
1930 BuilderStep::Log {
1931 level: camel_processor::LogLevel::Info,
1932 message: "log".into(),
1933 },
1934 BuilderStep::DeclarativeSetHeader {
1935 key: "k".into(),
1936 value: ValueSourceDef::Literal(Value::String("v".into())),
1937 },
1938 BuilderStep::DeclarativeSetHeader {
1939 key: "k2".into(),
1940 value: ValueSourceDef::Expression(expr("${body}")),
1941 },
1942 BuilderStep::DeclarativeSetBody {
1943 value: ValueSourceDef::Expression(expr("${body}")),
1944 },
1945 BuilderStep::DeclarativeFilter {
1946 predicate: expr("${body} != null"),
1947 steps: vec![BuilderStep::Stop],
1948 },
1949 BuilderStep::DeclarativeChoice {
1950 whens: vec![
1951 crate::lifecycle::application::route_definition::DeclarativeWhenStep {
1952 predicate: expr("${body} == 'x'"),
1953 steps: vec![BuilderStep::Stop],
1954 },
1955 ],
1956 otherwise: Some(vec![BuilderStep::Stop]),
1957 },
1958 BuilderStep::DeclarativeScript {
1959 expression: expr("${body}"),
1960 },
1961 BuilderStep::Split {
1962 config: SplitterConfig::new(split_body_lines())
1963 .aggregation(AggregationStrategy::CollectAll),
1964 steps: vec![BuilderStep::Stop],
1965 },
1966 BuilderStep::DeclarativeSplit {
1967 expression: expr("${body}"),
1968 aggregation: AggregationStrategy::Original,
1969 parallel: false,
1970 parallel_limit: Some(2),
1971 stop_on_exception: true,
1972 steps: vec![BuilderStep::Stop],
1973 },
1974 BuilderStep::Aggregate {
1975 config: camel_api::AggregatorConfig::correlate_by("id")
1976 .complete_when_size(1)
1977 .build(),
1978 },
1979 BuilderStep::Filter {
1980 predicate: Arc::new(|_| true),
1981 steps: vec![BuilderStep::Stop],
1982 },
1983 BuilderStep::Choice {
1984 whens: vec![crate::lifecycle::application::route_definition::WhenStep {
1985 predicate: Arc::new(|_| true),
1986 steps: vec![BuilderStep::Stop],
1987 }],
1988 otherwise: Some(vec![BuilderStep::Stop]),
1989 },
1990 BuilderStep::WireTap {
1991 uri: "mock:tap".into(),
1992 },
1993 BuilderStep::Multicast {
1994 steps: vec![
1995 BuilderStep::To("mock:m1".into()),
1996 BuilderStep::To("mock:m2".into()),
1997 ],
1998 config: camel_api::MulticastConfig::new(),
1999 },
2000 BuilderStep::DeclarativeLog {
2001 level: camel_processor::LogLevel::Info,
2002 message: ValueSourceDef::Expression(expr("${body}")),
2003 },
2004 BuilderStep::Throttle {
2005 config: camel_api::ThrottlerConfig::new(10, Duration::from_millis(100)),
2006 steps: vec![BuilderStep::To("mock:t".into())],
2007 },
2008 BuilderStep::LoadBalance {
2009 config: camel_api::LoadBalancerConfig::round_robin(),
2010 steps: vec![
2011 BuilderStep::To("mock:l1".into()),
2012 BuilderStep::To("mock:l2".into()),
2013 ],
2014 },
2015 BuilderStep::DynamicRouter {
2016 config: camel_api::DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
2017 },
2018 BuilderStep::RoutingSlip {
2019 config: camel_api::RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
2020 },
2021 ];
2022
2023 let producer_ctx = ProducerContext::new();
2024 let resolved = controller
2025 .resolve_steps(steps, &producer_ctx, &controller.registry)
2026 .expect("resolve should succeed");
2027 assert!(!resolved.is_empty());
2028 }
2029
2030 #[test]
2031 fn resolve_steps_script_requires_mutating_language_support() {
2032 use camel_api::LanguageExpressionDef;
2033
2034 let mut controller = build_controller_with_components();
2035 set_self_ref(&mut controller);
2036 register_simple_language(&mut controller);
2037
2038 let steps = vec![BuilderStep::Script {
2039 language: "simple".into(),
2040 script: "${body}".into(),
2041 }];
2042
2043 let err = controller
2044 .resolve_steps(steps, &ProducerContext::new(), &controller.registry)
2045 .expect_err("simple script should fail for mutating expression");
2046 assert!(err.to_string().contains("does not support"));
2047
2048 let bean_missing = vec![BuilderStep::Bean {
2049 name: "unknown".into(),
2050 method: "run".into(),
2051 }];
2052 let bean_err = controller
2053 .resolve_steps(bean_missing, &ProducerContext::new(), &controller.registry)
2054 .expect_err("missing bean must fail");
2055 assert!(bean_err.to_string().contains("Bean not found"));
2056
2057 let bad_declarative = vec![BuilderStep::DeclarativeScript {
2058 expression: LanguageExpressionDef {
2059 language: "unknown".into(),
2060 source: "x".into(),
2061 },
2062 }];
2063 let lang_err = controller
2064 .resolve_steps(
2065 bad_declarative,
2066 &ProducerContext::new(),
2067 &controller.registry,
2068 )
2069 .expect_err("unknown language must fail");
2070 assert!(lang_err.to_string().contains("not registered"));
2071 }
2072
2073 #[tokio::test]
2074 async fn lifecycle_methods_report_missing_routes() {
2075 let mut controller = build_controller();
2076
2077 assert!(controller.start_route("missing").await.is_err());
2078 assert!(controller.stop_route("missing").await.is_err());
2079 assert!(controller.suspend_route("missing").await.is_err());
2080 assert!(controller.resume_route("missing").await.is_err());
2081 }
2082
2083 #[tokio::test]
2084 async fn start_stop_route_happy_path_with_timer_and_mock() {
2085 let mut controller = build_controller_with_components();
2086 set_self_ref(&mut controller);
2087
2088 let route = RouteDefinition::new(
2089 "timer:tick?period=10&repeatCount=1",
2090 vec![BuilderStep::To("mock:out".into())],
2091 )
2092 .with_route_id("rt-1");
2093 controller.add_route(route).unwrap();
2094
2095 controller.start_route("rt-1").await.unwrap();
2096 tokio::time::sleep(Duration::from_millis(40)).await;
2097 controller.stop_route("rt-1").await.unwrap();
2098
2099 controller.remove_route("rt-1").unwrap();
2100 }
2101
2102 #[tokio::test]
2103 async fn suspend_resume_and_restart_cover_execution_transitions() {
2104 let mut controller = build_controller_with_components();
2105 set_self_ref(&mut controller);
2106
2107 let route = RouteDefinition::new(
2108 "timer:tick?period=30",
2109 vec![BuilderStep::To("mock:out".into())],
2110 )
2111 .with_route_id("rt-2");
2112 controller.add_route(route).unwrap();
2113
2114 controller.start_route("rt-2").await.unwrap();
2115 controller.suspend_route("rt-2").await.unwrap();
2116 controller.resume_route("rt-2").await.unwrap();
2117 controller.restart_route("rt-2").await.unwrap();
2118 controller.stop_route("rt-2").await.unwrap();
2119 }
2120
2121 #[tokio::test]
2122 async fn remove_route_rejects_running_route() {
2123 let mut controller = build_controller_with_components();
2124 set_self_ref(&mut controller);
2125
2126 let route = RouteDefinition::new(
2127 "timer:tick?period=25",
2128 vec![BuilderStep::To("mock:out".into())],
2129 )
2130 .with_route_id("rt-running");
2131 controller.add_route(route).unwrap();
2132 controller.start_route("rt-running").await.unwrap();
2133
2134 let err = controller
2135 .remove_route("rt-running")
2136 .expect_err("running route removal must fail");
2137 assert!(err.to_string().contains("must be stopped before removal"));
2138
2139 controller.stop_route("rt-running").await.unwrap();
2140 controller.remove_route("rt-running").unwrap();
2141 }
2142
2143 #[tokio::test]
2144 async fn start_route_on_suspended_state_returns_guidance_error() {
2145 let mut controller = build_controller_with_components();
2146 set_self_ref(&mut controller);
2147
2148 let route = RouteDefinition::new(
2149 "timer:tick?period=40",
2150 vec![BuilderStep::To("mock:out".into())],
2151 )
2152 .with_route_id("rt-suspend");
2153 controller.add_route(route).unwrap();
2154
2155 controller.start_route("rt-suspend").await.unwrap();
2156 controller.suspend_route("rt-suspend").await.unwrap();
2157
2158 let err = controller
2159 .start_route("rt-suspend")
2160 .await
2161 .expect_err("start from suspended must fail");
2162 assert!(err.to_string().contains("use resume_route"));
2163
2164 controller.resume_route("rt-suspend").await.unwrap();
2165 controller.stop_route("rt-suspend").await.unwrap();
2166 }
2167
2168 #[tokio::test]
2169 async fn suspend_and_resume_validate_execution_state() {
2170 let mut controller = build_controller_with_components();
2171 set_self_ref(&mut controller);
2172
2173 controller
2174 .add_route(
2175 RouteDefinition::new("timer:tick?period=50", vec![]).with_route_id("rt-state"),
2176 )
2177 .unwrap();
2178
2179 let suspend_err = controller
2180 .suspend_route("rt-state")
2181 .await
2182 .expect_err("suspend before start must fail");
2183 assert!(suspend_err.to_string().contains("Cannot suspend route"));
2184
2185 controller.start_route("rt-state").await.unwrap();
2186 let resume_err = controller
2187 .resume_route("rt-state")
2188 .await
2189 .expect_err("resume while started must fail");
2190 assert!(resume_err.to_string().contains("Cannot resume route"));
2191
2192 controller.stop_route("rt-state").await.unwrap();
2193 }
2194
2195 #[tokio::test]
2196 async fn concurrent_concurrency_override_path_executes() {
2197 let mut controller = build_controller_with_components();
2198 set_self_ref(&mut controller);
2199
2200 let route = RouteDefinition::new(
2201 "timer:tick?period=10&repeatCount=2",
2202 vec![BuilderStep::To("mock:out".into())],
2203 )
2204 .with_route_id("rt-concurrent")
2205 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(2) });
2206
2207 controller.add_route(route).unwrap();
2208 controller.start_route("rt-concurrent").await.unwrap();
2209 tokio::time::sleep(Duration::from_millis(50)).await;
2210 controller.stop_route("rt-concurrent").await.unwrap();
2211 }
2212
2213 #[tokio::test]
2214 async fn add_route_with_circuit_breaker_and_error_handler_compiles() {
2215 use camel_api::circuit_breaker::CircuitBreakerConfig;
2216 use camel_api::error_handler::ErrorHandlerConfig;
2217
2218 let mut controller = build_controller_with_components();
2219 set_self_ref(&mut controller);
2220
2221 let route = RouteDefinition::new("timer:tick?period=25", vec![BuilderStep::Stop])
2222 .with_route_id("rt-eh")
2223 .with_circuit_breaker(CircuitBreakerConfig::new())
2224 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"));
2225
2226 controller
2227 .add_route(route)
2228 .expect("route with layers should compile");
2229 controller.start_route("rt-eh").await.unwrap();
2230 controller.stop_route("rt-eh").await.unwrap();
2231 }
2232
2233 #[tokio::test]
2234 async fn compile_and_swap_errors_for_missing_route() {
2235 let mut controller = build_controller_with_components();
2236 set_self_ref(&mut controller);
2237
2238 let compiled = controller
2239 .compile_route_definition(
2240 RouteDefinition::new("timer:tick?period=10", vec![BuilderStep::Stop])
2241 .with_route_id("compiled"),
2242 )
2243 .expect("compile should work");
2244
2245 let err = controller
2246 .swap_pipeline("nope", compiled)
2247 .expect_err("missing route swap must fail");
2248 assert!(err.to_string().contains("not found"));
2249 }
2250}