1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21 BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
22 RouteController, RuntimeCommand, RuntimeHandle, Value, body::Body,
23};
24use camel_component_api::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
25use camel_endpoint::parse_uri;
26use camel_language_api::{Expression, Language, LanguageError, Predicate};
27pub use camel_processor::aggregator::SharedLanguageRegistry;
28use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
29use camel_processor::circuit_breaker::CircuitBreakerLayer;
30use camel_processor::error_handler::ErrorHandlerLayer;
31use camel_processor::script_mutator::ScriptMutator;
32use camel_processor::{ChoiceService, WhenClause};
33
34use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
35use crate::lifecycle::adapters::route_compiler::{
36 compose_pipeline, compose_traced_pipeline_with_contracts,
37};
38use crate::lifecycle::application::route_definition::{
39 BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
40};
41use crate::shared::components::domain::Registry;
42use crate::shared::observability::domain::{DetailLevel, TracerConfig};
43use arc_swap::ArcSwap;
44use camel_bean::BeanRegistry;
45
46#[derive(Debug, Clone)]
51pub struct CrashNotification {
52 pub route_id: String,
54 pub error: String,
56}
57
58pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
73unsafe impl Sync for SyncBoxProcessor {}
74
75type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
76
77#[async_trait::async_trait]
83pub trait RouteControllerInternal: RouteController + Send {
84 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
86
87 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
89
90 fn route_from_uri(&self, route_id: &str) -> Option<String>;
92
93 fn set_error_handler(&mut self, config: ErrorHandlerConfig);
95
96 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
98
99 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>);
101
102 fn route_count(&self) -> usize;
104
105 fn in_flight_count(&self, route_id: &str) -> Option<u64>;
107
108 fn route_exists(&self, route_id: &str) -> bool;
110
111 fn route_ids(&self) -> Vec<String>;
113
114 fn auto_startup_route_ids(&self) -> Vec<String>;
116
117 fn shutdown_route_ids(&self) -> Vec<String>;
119
120 fn set_tracer_config(&mut self, config: &TracerConfig);
122
123 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
126
127 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
129
130 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
132
133 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
135}
136
137struct AggregateSplitInfo {
139 pre_pipeline: SharedPipeline,
140 agg_config: AggregatorConfig,
141 post_pipeline: SharedPipeline,
142}
143
144struct ManagedRoute {
145 definition: RouteDefinitionInfo,
147 from_uri: String,
149 pipeline: SharedPipeline,
151 concurrency: Option<ConcurrencyModel>,
153 consumer_handle: Option<JoinHandle<()>>,
155 pipeline_handle: Option<JoinHandle<()>>,
157 consumer_cancel_token: CancellationToken,
160 pipeline_cancel_token: CancellationToken,
163 channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
166 in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
168 aggregate_split: Option<AggregateSplitInfo>,
169 agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
170}
171
172fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
173 handle.as_ref().is_some_and(|h| !h.is_finished())
174}
175
176fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
177 match (
178 handle_is_running(&managed.consumer_handle),
179 handle_is_running(&managed.pipeline_handle),
180 ) {
181 (true, true) => "Started",
182 (false, true) => "Suspended",
183 (true, false) => "Stopping",
184 (false, false) => "Stopped",
185 }
186}
187
188fn find_top_level_aggregate_with_timeout(
189 steps: &[BuilderStep],
190) -> Option<(usize, AggregatorConfig)> {
191 for (i, step) in steps.iter().enumerate() {
192 if let BuilderStep::Aggregate { config } = step {
193 if has_timeout_condition(&config.completion) {
194 return Some((i, config.clone()));
195 }
196 break;
197 }
198 }
199 None
200}
201
202fn is_pending(ex: &Exchange) -> bool {
203 ex.property("CamelAggregatorPending")
204 .and_then(|v| v.as_bool())
205 .unwrap_or(false)
206}
207
208async fn ready_with_backoff(
215 pipeline: &mut BoxProcessor,
216 cancel: &CancellationToken,
217) -> Result<(), CamelError> {
218 loop {
219 match pipeline.ready().await {
220 Ok(_) => return Ok(()),
221 Err(CamelError::CircuitOpen(ref msg)) => {
222 warn!("Circuit open, backing off: {msg}");
223 tokio::select! {
224 _ = tokio::time::sleep(Duration::from_secs(1)) => {
225 continue;
226 }
227 _ = cancel.cancelled() => {
228 return Err(CamelError::CircuitOpen(msg.clone()));
230 }
231 }
232 }
233 Err(e) => {
234 error!("Pipeline not ready: {e}");
235 return Err(e);
236 }
237 }
238 }
239}
240
241fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
242 let stamp = std::time::SystemTime::now()
243 .duration_since(std::time::UNIX_EPOCH)
244 .unwrap_or_default()
245 .as_nanos();
246 RuntimeCommand::FailRoute {
247 route_id: route_id.to_string(),
248 error: error.to_string(),
249 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
250 causation_id: None,
251 }
252}
253
254async fn publish_runtime_failure(
255 runtime: Option<Weak<dyn RuntimeHandle>>,
256 route_id: &str,
257 error: &str,
258) {
259 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
260 return;
261 };
262 let command = runtime_failure_command(route_id, error);
263 if let Err(runtime_error) = runtime.execute(command).await {
264 warn!(
265 route_id = %route_id,
266 error = %runtime_error,
267 "failed to synchronize route crash with runtime projection"
268 );
269 }
270}
271
272pub struct DefaultRouteController {
280 routes: HashMap<String, ManagedRoute>,
282 registry: Arc<std::sync::Mutex<Registry>>,
284 languages: SharedLanguageRegistry,
286 beans: Arc<std::sync::Mutex<BeanRegistry>>,
288 self_ref: Option<Arc<Mutex<dyn RouteController>>>,
291 runtime: Option<Weak<dyn RuntimeHandle>>,
293 global_error_handler: Option<ErrorHandlerConfig>,
295 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
297 tracing_enabled: bool,
299 tracer_detail_level: DetailLevel,
301 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
303}
304
305impl DefaultRouteController {
306 pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
308 Self::with_beans(
309 registry,
310 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
311 )
312 }
313
314 pub fn with_beans(
316 registry: Arc<std::sync::Mutex<Registry>>,
317 beans: Arc<std::sync::Mutex<BeanRegistry>>,
318 ) -> Self {
319 Self {
320 routes: HashMap::new(),
321 registry,
322 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
323 beans,
324 self_ref: None,
325 runtime: None,
326 global_error_handler: None,
327 crash_notifier: None,
328 tracing_enabled: false,
329 tracer_detail_level: DetailLevel::Minimal,
330 tracer_metrics: None,
331 }
332 }
333
334 pub fn with_languages(
336 registry: Arc<std::sync::Mutex<Registry>>,
337 languages: SharedLanguageRegistry,
338 ) -> Self {
339 Self {
340 routes: HashMap::new(),
341 registry,
342 languages,
343 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
344 self_ref: None,
345 runtime: None,
346 global_error_handler: None,
347 crash_notifier: None,
348 tracing_enabled: false,
349 tracer_detail_level: DetailLevel::Minimal,
350 tracer_metrics: None,
351 }
352 }
353
354 pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
358 self.self_ref = Some(self_ref);
359 }
360
361 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
363 self.runtime = Some(Arc::downgrade(&runtime));
364 }
365
366 pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
371 self.self_ref.clone()
372 }
373
374 pub fn runtime_handle_for_supervision(&self) -> Option<Arc<dyn RuntimeHandle>> {
376 self.runtime.as_ref().and_then(Weak::upgrade)
377 }
378
379 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
384 self.crash_notifier = Some(tx);
385 }
386
387 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
389 self.global_error_handler = Some(config);
390 }
391
392 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
394 self.tracing_enabled = config.enabled;
395 self.tracer_detail_level = config.detail_level.clone();
396 self.tracer_metrics = config.metrics_collector.clone();
397 }
398
399 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
400 let mut producer_ctx = ProducerContext::new();
401 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
402 producer_ctx = producer_ctx.with_runtime(runtime);
403 }
404 Ok(producer_ctx)
405 }
406
407 fn resolve_error_handler(
409 &self,
410 config: ErrorHandlerConfig,
411 producer_ctx: &ProducerContext,
412 registry: &Registry,
413 ) -> Result<ErrorHandlerLayer, CamelError> {
414 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
416 let parsed = parse_uri(uri)?;
417 let component = registry.get_or_err(&parsed.scheme)?;
418 let endpoint = component.create_endpoint(uri)?;
419 Some(endpoint.create_producer(producer_ctx)?)
420 } else {
421 None
422 };
423
424 let mut resolved_policies = Vec::new();
426 for policy in config.policies {
427 let handler_producer = if let Some(ref uri) = policy.handled_by {
428 let parsed = parse_uri(uri)?;
429 let component = registry.get_or_err(&parsed.scheme)?;
430 let endpoint = component.create_endpoint(uri)?;
431 Some(endpoint.create_producer(producer_ctx)?)
432 } else {
433 None
434 };
435 resolved_policies.push((policy, handler_producer));
436 }
437
438 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
439 }
440
441 fn resolve_uow_layer(
444 &self,
445 config: &UnitOfWorkConfig,
446 producer_ctx: &ProducerContext,
447 registry: &Registry,
448 counter: Option<Arc<AtomicU64>>,
449 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
450 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
451 let parsed = parse_uri(uri)?;
452 let component = registry.get_or_err(&parsed.scheme)?;
453 let endpoint = component.create_endpoint(uri)?;
454 endpoint.create_producer(producer_ctx).map_err(|e| {
455 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
456 })
457 };
458
459 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
460 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
461
462 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
463 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
464 Ok((layer, counter))
465 }
466
467 fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
468 let guard = self
469 .languages
470 .lock()
471 .expect("mutex poisoned: another thread panicked while holding this lock");
472 guard.get(language).cloned().ok_or_else(|| {
473 CamelError::RouteError(format!(
474 "language `{language}` is not registered in CamelContext"
475 ))
476 })
477 }
478
479 fn compile_language_expression(
480 &self,
481 expression: &LanguageExpressionDef,
482 ) -> Result<Arc<dyn Expression>, CamelError> {
483 let language = self.resolve_language(&expression.language)?;
484 let compiled = language
485 .create_expression(&expression.source)
486 .map_err(|e| {
487 CamelError::RouteError(format!(
488 "failed to compile {} expression `{}`: {e}",
489 expression.language, expression.source
490 ))
491 })?;
492 Ok(Arc::from(compiled))
493 }
494
495 fn compile_language_predicate(
496 &self,
497 expression: &LanguageExpressionDef,
498 ) -> Result<Arc<dyn Predicate>, CamelError> {
499 let language = self.resolve_language(&expression.language)?;
500 let compiled = language.create_predicate(&expression.source).map_err(|e| {
501 CamelError::RouteError(format!(
502 "failed to compile {} predicate `{}`: {e}",
503 expression.language, expression.source
504 ))
505 })?;
506 Ok(Arc::from(compiled))
507 }
508
509 fn compile_filter_predicate(
510 &self,
511 expression: &LanguageExpressionDef,
512 ) -> Result<FilterPredicate, CamelError> {
513 let predicate = self.compile_language_predicate(expression)?;
514 Ok(Arc::new(move |exchange: &Exchange| {
515 predicate.matches(exchange).unwrap_or(false)
516 }))
517 }
518
519 fn value_to_body(value: Value) -> Body {
520 match value {
521 Value::Null => Body::Empty,
522 Value::String(text) => Body::Text(text),
523 other => Body::Json(other),
524 }
525 }
526
527 pub(crate) fn resolve_steps(
529 &self,
530 steps: Vec<BuilderStep>,
531 producer_ctx: &ProducerContext,
532 registry: &Arc<std::sync::Mutex<Registry>>,
533 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
534 let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
535 let parsed = parse_uri(uri)?;
536 let registry_guard = registry
537 .lock()
538 .expect("mutex poisoned: another thread panicked while holding this lock");
539 let component = registry_guard.get_or_err(&parsed.scheme)?;
540 let endpoint = component.create_endpoint(uri)?;
541 endpoint.create_producer(producer_ctx)
542 };
543
544 let mut processors: Vec<(BoxProcessor, Option<camel_api::BodyType>)> = Vec::new();
545 for step in steps {
546 match step {
547 BuilderStep::Processor(svc) => {
548 processors.push((svc, None));
549 }
550 BuilderStep::To(uri) => {
551 let parsed = parse_uri(&uri)?;
552 let registry_guard = registry
553 .lock()
554 .expect("mutex poisoned: another thread panicked while holding this lock");
555 let component = registry_guard.get_or_err(&parsed.scheme)?;
556 let endpoint = component.create_endpoint(&uri)?;
557 let contract = endpoint.body_contract();
558 let producer = endpoint.create_producer(producer_ctx)?;
559 processors.push((producer, contract));
560 }
561 BuilderStep::Stop => {
562 processors.push((BoxProcessor::new(camel_processor::StopService), None));
563 }
564 BuilderStep::Delay { config } => {
565 let svc = camel_processor::delayer::DelayerService::new(config);
566 processors.push((BoxProcessor::new(svc), None));
567 }
568 BuilderStep::Log { level, message } => {
569 let svc = camel_processor::LogProcessor::new(level, message);
570 processors.push((BoxProcessor::new(svc), None));
571 }
572 BuilderStep::DeclarativeSetHeader { key, value } => match value {
573 ValueSourceDef::Literal(value) => {
574 let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
575 processors.push((BoxProcessor::new(svc), None));
576 }
577 ValueSourceDef::Expression(expression) => {
578 let expression = self.compile_language_expression(&expression)?;
579 let svc = camel_processor::DynamicSetHeader::new(
580 IdentityProcessor,
581 key,
582 move |exchange: &Exchange| {
583 expression.evaluate(exchange).unwrap_or(Value::Null)
584 },
585 );
586 processors.push((BoxProcessor::new(svc), None));
587 }
588 },
589 BuilderStep::DeclarativeSetBody { value } => match value {
590 ValueSourceDef::Literal(value) => {
591 let body = Self::value_to_body(value);
592 let svc = camel_processor::SetBody::new(
593 IdentityProcessor,
594 move |_exchange: &Exchange| body.clone(),
595 );
596 processors.push((BoxProcessor::new(svc), None));
597 }
598 ValueSourceDef::Expression(expression) => {
599 let expression = self.compile_language_expression(&expression)?;
600 let svc = camel_processor::SetBody::new(
601 IdentityProcessor,
602 move |exchange: &Exchange| {
603 let value = expression.evaluate(exchange).unwrap_or(Value::Null);
604 Self::value_to_body(value)
605 },
606 );
607 processors.push((BoxProcessor::new(svc), None));
608 }
609 },
610 BuilderStep::DeclarativeFilter { predicate, steps } => {
611 let predicate = self.compile_filter_predicate(&predicate)?;
612 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
613 let sub_processors: Vec<BoxProcessor> =
614 sub_pairs.into_iter().map(|(p, _)| p).collect();
615 let sub_pipeline = compose_pipeline(sub_processors);
616 let svc =
617 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
618 processors.push((BoxProcessor::new(svc), None));
619 }
620 BuilderStep::DeclarativeChoice { whens, otherwise } => {
621 let mut when_clauses = Vec::new();
622 for when_step in whens {
623 let predicate = self.compile_filter_predicate(&when_step.predicate)?;
624 let sub_pairs =
625 self.resolve_steps(when_step.steps, producer_ctx, registry)?;
626 let sub_processors: Vec<BoxProcessor> =
627 sub_pairs.into_iter().map(|(p, _)| p).collect();
628 let pipeline = compose_pipeline(sub_processors);
629 when_clauses.push(WhenClause {
630 predicate,
631 pipeline,
632 });
633 }
634 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
635 let sub_pairs =
636 self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
637 let sub_processors: Vec<BoxProcessor> =
638 sub_pairs.into_iter().map(|(p, _)| p).collect();
639 Some(compose_pipeline(sub_processors))
640 } else {
641 None
642 };
643 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
644 processors.push((BoxProcessor::new(svc), None));
645 }
646 BuilderStep::DeclarativeScript { expression } => {
647 let lang = self.resolve_language(&expression.language)?;
648 match lang.create_mutating_expression(&expression.source) {
649 Ok(mut_expr) => {
650 processors
651 .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
652 }
653 Err(LanguageError::NotSupported { .. }) => {
654 let expression = self.compile_language_expression(&expression)?;
662 let svc = camel_processor::SetBody::new(
663 IdentityProcessor,
664 move |exchange: &Exchange| {
665 let value =
666 expression.evaluate(exchange).unwrap_or(Value::Null);
667 Self::value_to_body(value)
668 },
669 );
670 processors.push((BoxProcessor::new(svc), None));
671 }
672 Err(e) => {
673 return Err(CamelError::RouteError(format!(
674 "Failed to create mutating expression for language '{}': {}",
675 expression.language, e
676 )));
677 }
678 }
679 }
680 BuilderStep::Split { config, steps } => {
681 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
682 let sub_processors: Vec<BoxProcessor> =
683 sub_pairs.into_iter().map(|(p, _)| p).collect();
684 let sub_pipeline = compose_pipeline(sub_processors);
685 let splitter =
686 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
687 processors.push((BoxProcessor::new(splitter), None));
688 }
689 BuilderStep::DeclarativeSplit {
690 expression,
691 aggregation,
692 parallel,
693 parallel_limit,
694 stop_on_exception,
695 steps,
696 } => {
697 let lang_expr = self.compile_language_expression(&expression)?;
698 let split_fn = move |exchange: &Exchange| {
699 let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
700 match value {
701 Value::String(s) => s
702 .lines()
703 .filter(|line| !line.is_empty())
704 .map(|line| {
705 let mut fragment = exchange.clone();
706 fragment.input.body = Body::from(line.to_string());
707 fragment
708 })
709 .collect(),
710 Value::Array(arr) => arr
711 .into_iter()
712 .map(|v| {
713 let mut fragment = exchange.clone();
714 fragment.input.body = Body::from(v);
715 fragment
716 })
717 .collect(),
718 _ => vec![exchange.clone()],
719 }
720 };
721
722 let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
723 .aggregation(aggregation)
724 .parallel(parallel)
725 .stop_on_exception(stop_on_exception);
726 if let Some(limit) = parallel_limit {
727 config = config.parallel_limit(limit);
728 }
729
730 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
731 let sub_processors: Vec<BoxProcessor> =
732 sub_pairs.into_iter().map(|(p, _)| p).collect();
733 let sub_pipeline = compose_pipeline(sub_processors);
734 let splitter =
735 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
736 processors.push((BoxProcessor::new(splitter), None));
737 }
738 BuilderStep::Aggregate { config } => {
739 let (late_tx, _late_rx) = mpsc::channel(256);
740 let registry: SharedLanguageRegistry =
741 Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
742 let cancel = CancellationToken::new();
743 let svc =
744 camel_processor::AggregatorService::new(config, late_tx, registry, cancel);
745 processors.push((BoxProcessor::new(svc), None));
746 }
747 BuilderStep::Filter { predicate, steps } => {
748 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
749 let sub_processors: Vec<BoxProcessor> =
750 sub_pairs.into_iter().map(|(p, _)| p).collect();
751 let sub_pipeline = compose_pipeline(sub_processors);
752 let svc =
753 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
754 processors.push((BoxProcessor::new(svc), None));
755 }
756 BuilderStep::Choice { whens, otherwise } => {
757 let mut when_clauses = Vec::new();
759 for when_step in whens {
760 let sub_pairs =
761 self.resolve_steps(when_step.steps, producer_ctx, registry)?;
762 let sub_processors: Vec<BoxProcessor> =
763 sub_pairs.into_iter().map(|(p, _)| p).collect();
764 let pipeline = compose_pipeline(sub_processors);
765 when_clauses.push(WhenClause {
766 predicate: when_step.predicate,
767 pipeline,
768 });
769 }
770 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
772 let sub_pairs =
773 self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
774 let sub_processors: Vec<BoxProcessor> =
775 sub_pairs.into_iter().map(|(p, _)| p).collect();
776 Some(compose_pipeline(sub_processors))
777 } else {
778 None
779 };
780 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
781 processors.push((BoxProcessor::new(svc), None));
782 }
783 BuilderStep::WireTap { uri } => {
784 let producer = resolve_producer(&uri)?;
785 let svc = camel_processor::WireTapService::new(producer);
786 processors.push((BoxProcessor::new(svc), None));
787 }
788 BuilderStep::Multicast { config, steps } => {
789 let mut endpoints = Vec::new();
791 for step in steps {
792 let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
793 let sub_processors: Vec<BoxProcessor> =
794 sub_pairs.into_iter().map(|(p, _)| p).collect();
795 let endpoint = compose_pipeline(sub_processors);
796 endpoints.push(endpoint);
797 }
798 let svc = camel_processor::MulticastService::new(endpoints, config);
799 processors.push((BoxProcessor::new(svc), None));
800 }
801 BuilderStep::DeclarativeLog { level, message } => {
802 let ValueSourceDef::Expression(expression) = message else {
803 unreachable!(
806 "DeclarativeLog with Literal should have been compiled to a Processor"
807 );
808 };
809 let expression = self.compile_language_expression(&expression)?;
810 let svc =
811 camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
812 expression
813 .evaluate(exchange)
814 .unwrap_or_else(|e| {
815 warn!(error = %e, "log expression evaluation failed");
816 Value::Null
817 })
818 .to_string()
819 });
820 processors.push((BoxProcessor::new(svc), None));
821 }
822 BuilderStep::Bean { name, method } => {
823 let beans = self.beans.lock().expect(
825 "beans mutex poisoned: another thread panicked while holding this lock",
826 );
827
828 let bean = beans.get(&name).ok_or_else(|| {
830 CamelError::ProcessorError(format!("Bean not found: {}", name))
831 })?;
832
833 let bean_clone = Arc::clone(&bean);
835 let method = method.clone();
836
837 let processor = tower::service_fn(move |mut exchange: Exchange| {
839 let bean = Arc::clone(&bean_clone);
840 let method = method.clone();
841
842 async move {
843 bean.call(&method, &mut exchange).await?;
844 Ok(exchange)
845 }
846 });
847
848 processors.push((BoxProcessor::new(processor), None));
849 }
850 BuilderStep::Script { language, script } => {
851 let lang = self.resolve_language(&language)?;
852 match lang.create_mutating_expression(&script) {
853 Ok(mut_expr) => {
854 processors
855 .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
856 }
857 Err(LanguageError::NotSupported {
858 feature,
859 language: ref lang_name,
860 }) => {
861 return Err(CamelError::RouteError(format!(
864 "Language '{}' does not support {} (required for .script() step)",
865 lang_name, feature
866 )));
867 }
868 Err(e) => {
869 return Err(CamelError::RouteError(format!(
870 "Failed to create mutating expression for language '{}': {}",
871 language, e
872 )));
873 }
874 }
875 }
876 BuilderStep::Throttle { config, steps } => {
877 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
878 let sub_processors: Vec<BoxProcessor> =
879 sub_pairs.into_iter().map(|(p, _)| p).collect();
880 let sub_pipeline = compose_pipeline(sub_processors);
881 let svc =
882 camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
883 processors.push((BoxProcessor::new(svc), None));
884 }
885 BuilderStep::LoadBalance { config, steps } => {
886 let mut endpoints = Vec::new();
888 for step in steps {
889 let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
890 let sub_processors: Vec<BoxProcessor> =
891 sub_pairs.into_iter().map(|(p, _)| p).collect();
892 let endpoint = compose_pipeline(sub_processors);
893 endpoints.push(endpoint);
894 }
895 let svc =
896 camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
897 processors.push((BoxProcessor::new(svc), None));
898 }
899 BuilderStep::DynamicRouter { config } => {
900 use camel_api::EndpointResolver;
901
902 let producer_ctx_clone = producer_ctx.clone();
903 let registry_clone = Arc::clone(registry);
904 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
905 let parsed = match parse_uri(uri) {
906 Ok(p) => p,
907 Err(_) => return None,
908 };
909 let registry_guard = match registry_clone.lock() {
910 Ok(g) => g,
911 Err(_) => return None, };
913 let component = match registry_guard.get_or_err(&parsed.scheme) {
914 Ok(c) => c,
915 Err(_) => return None,
916 };
917 let endpoint = match component.create_endpoint(uri) {
918 Ok(e) => e,
919 Err(_) => return None,
920 };
921 let producer = match endpoint.create_producer(&producer_ctx_clone) {
922 Ok(p) => p,
923 Err(_) => return None,
924 };
925 Some(BoxProcessor::new(producer))
926 });
927 let svc = camel_processor::dynamic_router::DynamicRouterService::new(
928 config, resolver,
929 );
930 processors.push((BoxProcessor::new(svc), None));
931 }
932 BuilderStep::DeclarativeDynamicRouter {
933 expression,
934 uri_delimiter,
935 cache_size,
936 ignore_invalid_endpoints,
937 max_iterations,
938 } => {
939 use camel_api::EndpointResolver;
940
941 let expression = self.compile_language_expression(&expression)?;
942 let expression: camel_api::RouterExpression =
943 Arc::new(move |exchange: &Exchange| {
944 let value = expression.evaluate(exchange).unwrap_or(Value::Null);
945 match value {
946 Value::Null => None,
947 Value::String(s) => Some(s),
948 other => Some(other.to_string()),
949 }
950 });
951
952 let config = camel_api::DynamicRouterConfig::new(expression)
956 .uri_delimiter(uri_delimiter)
957 .cache_size(cache_size)
958 .ignore_invalid_endpoints(ignore_invalid_endpoints)
959 .max_iterations(max_iterations);
960
961 let producer_ctx_clone = producer_ctx.clone();
962 let registry_clone = Arc::clone(registry);
963 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
964 let parsed = match parse_uri(uri) {
965 Ok(p) => p,
966 Err(_) => return None,
967 };
968 let registry_guard = match registry_clone.lock() {
969 Ok(g) => g,
970 Err(_) => return None,
971 };
972 let component = match registry_guard.get_or_err(&parsed.scheme) {
973 Ok(c) => c,
974 Err(_) => return None,
975 };
976 let endpoint = match component.create_endpoint(uri) {
977 Ok(e) => e,
978 Err(_) => return None,
979 };
980 let producer = match endpoint.create_producer(&producer_ctx_clone) {
981 Ok(p) => p,
982 Err(_) => return None,
983 };
984 Some(BoxProcessor::new(producer))
985 });
986 let svc = camel_processor::dynamic_router::DynamicRouterService::new(
987 config, resolver,
988 );
989 processors.push((BoxProcessor::new(svc), None));
990 }
991 BuilderStep::RoutingSlip { config } => {
992 use camel_api::EndpointResolver;
993
994 let producer_ctx_clone = producer_ctx.clone();
995 let registry_clone = Arc::clone(registry);
996 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
997 let parsed = match parse_uri(uri) {
998 Ok(p) => p,
999 Err(_) => return None,
1000 };
1001 let registry_guard = match registry_clone.lock() {
1002 Ok(g) => g,
1003 Err(_) => return None,
1004 };
1005 let component = match registry_guard.get_or_err(&parsed.scheme) {
1006 Ok(c) => c,
1007 Err(_) => return None,
1008 };
1009 let endpoint = match component.create_endpoint(uri) {
1010 Ok(e) => e,
1011 Err(_) => return None,
1012 };
1013 let producer = match endpoint.create_producer(&producer_ctx_clone) {
1014 Ok(p) => p,
1015 Err(_) => return None,
1016 };
1017 Some(BoxProcessor::new(producer))
1018 });
1019
1020 let svc =
1021 camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
1022 processors.push((BoxProcessor::new(svc), None));
1023 }
1024 BuilderStep::DeclarativeRoutingSlip {
1025 expression,
1026 uri_delimiter,
1027 cache_size,
1028 ignore_invalid_endpoints,
1029 } => {
1030 use camel_api::EndpointResolver;
1031
1032 let expression = self.compile_language_expression(&expression)?;
1033 let expression: camel_api::RoutingSlipExpression =
1034 Arc::new(move |exchange: &Exchange| {
1035 let value = expression.evaluate(exchange).unwrap_or(Value::Null);
1036 match value {
1037 Value::Null => None,
1038 Value::String(s) => Some(s),
1039 other => Some(other.to_string()),
1040 }
1041 });
1042
1043 let config = camel_api::RoutingSlipConfig::new(expression)
1044 .uri_delimiter(uri_delimiter)
1045 .cache_size(cache_size)
1046 .ignore_invalid_endpoints(ignore_invalid_endpoints);
1047
1048 let producer_ctx_clone = producer_ctx.clone();
1049 let registry_clone = Arc::clone(registry);
1050 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
1051 let parsed = match parse_uri(uri) {
1052 Ok(p) => p,
1053 Err(_) => return None,
1054 };
1055 let registry_guard = match registry_clone.lock() {
1056 Ok(g) => g,
1057 Err(_) => return None,
1058 };
1059 let component = match registry_guard.get_or_err(&parsed.scheme) {
1060 Ok(c) => c,
1061 Err(_) => return None,
1062 };
1063 let endpoint = match component.create_endpoint(uri) {
1064 Ok(e) => e,
1065 Err(_) => return None,
1066 };
1067 let producer = match endpoint.create_producer(&producer_ctx_clone) {
1068 Ok(p) => p,
1069 Err(_) => return None,
1070 };
1071 Some(BoxProcessor::new(producer))
1072 });
1073
1074 let svc =
1075 camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
1076 processors.push((BoxProcessor::new(svc), None));
1077 }
1078 }
1079 }
1080 Ok(processors)
1081 }
1082
1083 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
1093 let route_id = definition.route_id().to_string();
1094
1095 if self.routes.contains_key(&route_id) {
1096 return Err(CamelError::RouteError(format!(
1097 "Route '{}' already exists",
1098 route_id
1099 )));
1100 }
1101
1102 info!(route_id = %route_id, "Adding route to controller");
1103
1104 let definition_info = definition.to_info();
1106 let RouteDefinition {
1107 from_uri,
1108 steps,
1109 error_handler,
1110 circuit_breaker,
1111 unit_of_work,
1112 concurrency,
1113 ..
1114 } = definition;
1115
1116 let producer_ctx = self.build_producer_context()?;
1118
1119 let mut aggregate_split: Option<AggregateSplitInfo> = None;
1121 let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
1122 Some((idx, agg_config)) => {
1123 let mut pre_steps = steps;
1124 let mut rest = pre_steps.split_off(idx);
1125 let _agg_step = rest.remove(0);
1126 let post_steps = rest;
1127
1128 let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
1129 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
1130 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
1131 compose_pipeline(pre_procs),
1132 )));
1133
1134 let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
1135 let post_procs: Vec<BoxProcessor> =
1136 post_pairs.into_iter().map(|(p, _)| p).collect();
1137 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
1138 compose_pipeline(post_procs),
1139 )));
1140
1141 aggregate_split = Some(AggregateSplitInfo {
1142 pre_pipeline,
1143 agg_config,
1144 post_pipeline,
1145 });
1146
1147 vec![]
1148 }
1149 None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
1150 };
1151 let route_id_for_tracing = route_id.clone();
1152 let mut pipeline = if processors_with_contracts.is_empty() {
1153 BoxProcessor::new(IdentityProcessor)
1154 } else {
1155 compose_traced_pipeline_with_contracts(
1156 processors_with_contracts,
1157 &route_id_for_tracing,
1158 self.tracing_enabled,
1159 self.tracer_detail_level.clone(),
1160 self.tracer_metrics.clone(),
1161 )
1162 };
1163
1164 if let Some(cb_config) = circuit_breaker {
1166 let cb_layer = CircuitBreakerLayer::new(cb_config);
1167 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1168 }
1169
1170 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
1172
1173 if let Some(config) = eh_config {
1174 let registry = self
1176 .registry
1177 .lock()
1178 .expect("mutex poisoned: another thread panicked while holding this lock");
1179 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
1180 pipeline = BoxProcessor::new(layer.layer(pipeline));
1181 }
1182
1183 let uow_counter = if let Some(uow_config) = &unit_of_work {
1185 let registry = self
1186 .registry
1187 .lock()
1188 .expect("mutex poisoned: registry lock in add_route uow");
1189 let (uow_layer, counter) =
1190 self.resolve_uow_layer(uow_config, &producer_ctx, ®istry, None)?;
1191 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1192 Some(counter)
1193 } else {
1194 None
1195 };
1196
1197 self.routes.insert(
1198 route_id.clone(),
1199 ManagedRoute {
1200 definition: definition_info,
1201 from_uri,
1202 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
1203 concurrency,
1204 consumer_handle: None,
1205 pipeline_handle: None,
1206 consumer_cancel_token: CancellationToken::new(),
1207 pipeline_cancel_token: CancellationToken::new(),
1208 channel_sender: None,
1209 in_flight: uow_counter,
1210 aggregate_split,
1211 agg_service: None,
1212 },
1213 );
1214
1215 Ok(())
1216 }
1217
1218 pub fn compile_route_definition(
1223 &self,
1224 def: RouteDefinition,
1225 ) -> Result<BoxProcessor, CamelError> {
1226 let route_id = def.route_id().to_string();
1227
1228 let producer_ctx = self.build_producer_context()?;
1229
1230 let processors_with_contracts =
1231 self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
1232 let mut pipeline = compose_traced_pipeline_with_contracts(
1233 processors_with_contracts,
1234 &route_id,
1235 self.tracing_enabled,
1236 self.tracer_detail_level.clone(),
1237 self.tracer_metrics.clone(),
1238 );
1239
1240 if let Some(cb_config) = def.circuit_breaker {
1241 let cb_layer = CircuitBreakerLayer::new(cb_config);
1242 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1243 }
1244
1245 let eh_config = def
1246 .error_handler
1247 .clone()
1248 .or_else(|| self.global_error_handler.clone());
1249 if let Some(config) = eh_config {
1250 let registry = self
1252 .registry
1253 .lock()
1254 .expect("mutex poisoned: registry lock in compile_route_definition");
1255 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
1256 pipeline = BoxProcessor::new(layer.layer(pipeline));
1257 }
1258
1259 if let Some(uow_config) = &def.unit_of_work {
1261 let existing_counter = self
1262 .routes
1263 .get(&route_id)
1264 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1265
1266 let registry = self
1267 .registry
1268 .lock()
1269 .expect("mutex poisoned: registry lock in compile_route_definition uow");
1270
1271 let (uow_layer, _counter) =
1272 self.resolve_uow_layer(uow_config, &producer_ctx, ®istry, existing_counter)?;
1273
1274 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1275 }
1276
1277 Ok(pipeline)
1278 }
1279
1280 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1286 let managed = self.routes.get(route_id).ok_or_else(|| {
1287 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1288 })?;
1289 if handle_is_running(&managed.consumer_handle)
1290 || handle_is_running(&managed.pipeline_handle)
1291 {
1292 return Err(CamelError::RouteError(format!(
1293 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1294 route_id,
1295 inferred_lifecycle_label(managed)
1296 )));
1297 }
1298 self.routes.remove(route_id);
1299 info!(route_id = %route_id, "Route removed from controller");
1300 Ok(())
1301 }
1302
1303 pub fn route_count(&self) -> usize {
1305 self.routes.len()
1306 }
1307
1308 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1309 self.routes.get(route_id).map(|r| {
1310 r.in_flight
1311 .as_ref()
1312 .map_or(0, |c| c.load(Ordering::Relaxed))
1313 })
1314 }
1315
1316 pub fn route_exists(&self, route_id: &str) -> bool {
1318 self.routes.contains_key(route_id)
1319 }
1320
1321 pub fn route_ids(&self) -> Vec<String> {
1323 self.routes.keys().cloned().collect()
1324 }
1325
1326 pub fn auto_startup_route_ids(&self) -> Vec<String> {
1328 let mut pairs: Vec<(String, i32)> = self
1329 .routes
1330 .iter()
1331 .filter(|(_, managed)| managed.definition.auto_startup())
1332 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1333 .collect();
1334 pairs.sort_by_key(|(_, order)| *order);
1335 pairs.into_iter().map(|(id, _)| id).collect()
1336 }
1337
1338 pub fn shutdown_route_ids(&self) -> Vec<String> {
1340 let mut pairs: Vec<(String, i32)> = self
1341 .routes
1342 .iter()
1343 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1344 .collect();
1345 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1346 pairs.into_iter().map(|(id, _)| id).collect()
1347 }
1348
1349 pub fn swap_pipeline(
1354 &self,
1355 route_id: &str,
1356 new_pipeline: BoxProcessor,
1357 ) -> Result<(), CamelError> {
1358 let managed = self
1359 .routes
1360 .get(route_id)
1361 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1362
1363 if managed.aggregate_split.is_some() {
1364 tracing::warn!(
1365 route_id = %route_id,
1366 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1367 );
1368 }
1369
1370 managed
1371 .pipeline
1372 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1373 info!(route_id = %route_id, "Pipeline swapped atomically");
1374 Ok(())
1375 }
1376
1377 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1379 self.routes.get(route_id).map(|r| r.from_uri.clone())
1380 }
1381
1382 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1387 self.routes
1388 .get(route_id)
1389 .map(|r| r.pipeline.load().0.clone())
1390 }
1391
1392 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1394 let managed = self
1395 .routes
1396 .get_mut(route_id)
1397 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1398
1399 if !handle_is_running(&managed.consumer_handle)
1400 && !handle_is_running(&managed.pipeline_handle)
1401 {
1402 return Ok(());
1403 }
1404
1405 info!(route_id = %route_id, "Stopping route");
1406
1407 let managed = self
1409 .routes
1410 .get_mut(route_id)
1411 .expect("invariant: route must exist after prior existence check");
1412 managed.consumer_cancel_token.cancel();
1413
1414 let managed = self
1416 .routes
1417 .get_mut(route_id)
1418 .expect("invariant: route must exist after prior existence check");
1419 if let Some(agg_svc) = &managed.agg_service {
1420 let guard = agg_svc.lock().unwrap();
1421 guard.force_complete_all();
1422 }
1423
1424 let managed = self
1425 .routes
1426 .get_mut(route_id)
1427 .expect("invariant: route must exist after prior existence check");
1428 managed.pipeline_cancel_token.cancel();
1429
1430 let managed = self
1432 .routes
1433 .get_mut(route_id)
1434 .expect("invariant: route must exist after prior existence check");
1435 let consumer_handle = managed.consumer_handle.take();
1436 let pipeline_handle = managed.pipeline_handle.take();
1437
1438 let managed = self
1441 .routes
1442 .get_mut(route_id)
1443 .expect("invariant: route must exist after prior existence check");
1444 managed.channel_sender = None;
1445
1446 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1450 match (consumer_handle, pipeline_handle) {
1451 (Some(c), Some(p)) => {
1452 let _ = tokio::join!(c, p);
1453 }
1454 (Some(c), None) => {
1455 let _ = c.await;
1456 }
1457 (None, Some(p)) => {
1458 let _ = p.await;
1459 }
1460 (None, None) => {}
1461 }
1462 })
1463 .await;
1464
1465 if timeout_result.is_err() {
1466 warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
1467 }
1468
1469 let managed = self
1471 .routes
1472 .get_mut(route_id)
1473 .expect("invariant: route must exist after prior existence check");
1474
1475 managed.consumer_cancel_token = CancellationToken::new();
1477 managed.pipeline_cancel_token = CancellationToken::new();
1478
1479 info!(route_id = %route_id, "Route stopped");
1480 Ok(())
1481 }
1482}
1483
1484#[async_trait::async_trait]
1485impl RouteController for DefaultRouteController {
1486 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1487 {
1489 let managed = self
1490 .routes
1491 .get_mut(route_id)
1492 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1493
1494 let consumer_running = handle_is_running(&managed.consumer_handle);
1495 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1496 if consumer_running && pipeline_running {
1497 return Ok(());
1498 }
1499 if !consumer_running && pipeline_running {
1500 return Err(CamelError::RouteError(format!(
1501 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1502 route_id
1503 )));
1504 }
1505 if consumer_running && !pipeline_running {
1506 return Err(CamelError::RouteError(format!(
1507 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1508 route_id
1509 )));
1510 }
1511 }
1512
1513 info!(route_id = %route_id, "Starting route");
1514
1515 let (from_uri, pipeline, concurrency) = {
1517 let managed = self
1518 .routes
1519 .get(route_id)
1520 .expect("invariant: route must exist after prior existence check");
1521 (
1522 managed.from_uri.clone(),
1523 Arc::clone(&managed.pipeline),
1524 managed.concurrency.clone(),
1525 )
1526 };
1527
1528 let crash_notifier = self.crash_notifier.clone();
1530 let runtime_for_consumer = self.runtime.clone();
1531
1532 let parsed = parse_uri(&from_uri)?;
1534 let registry = self
1535 .registry
1536 .lock()
1537 .expect("mutex poisoned: another thread panicked while holding this lock");
1538 let component = registry.get_or_err(&parsed.scheme)?;
1539 let endpoint = component.create_endpoint(&from_uri)?;
1540 let mut consumer = endpoint.create_consumer()?;
1541 let consumer_concurrency = consumer.concurrency_model();
1542 drop(registry);
1544
1545 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1547
1548 let managed = self
1550 .routes
1551 .get_mut(route_id)
1552 .expect("invariant: route must exist after prior existence check");
1553
1554 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1556 let consumer_cancel = managed.consumer_cancel_token.child_token();
1558 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1559 let tx_for_storage = tx.clone();
1561 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1562
1563 let managed = self
1565 .routes
1566 .get_mut(route_id)
1567 .expect("invariant: route must exist after prior existence check");
1568
1569 if let Some(split) = managed.aggregate_split.as_ref() {
1570 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1571
1572 let route_cancel_clone = pipeline_cancel.clone();
1573 let svc = AggregatorService::new(
1574 split.agg_config.clone(),
1575 late_tx,
1576 Arc::clone(&self.languages),
1577 route_cancel_clone,
1578 );
1579 let agg = Arc::new(std::sync::Mutex::new(svc));
1580
1581 managed.agg_service = Some(Arc::clone(&agg));
1582
1583 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1584 let pre_pipeline = Arc::clone(&split.pre_pipeline);
1585 let post_pipeline = Arc::clone(&split.post_pipeline);
1586
1587 let route_id_for_consumer = route_id.to_string();
1589 let consumer_handle = tokio::spawn(async move {
1590 if let Err(e) = consumer.start(consumer_ctx).await {
1591 error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1592 let error_msg = e.to_string();
1593 if let Some(tx) = crash_notifier {
1594 let _ = tx
1595 .send(CrashNotification {
1596 route_id: route_id_for_consumer.clone(),
1597 error: error_msg.clone(),
1598 })
1599 .await;
1600 }
1601 publish_runtime_failure(
1602 runtime_for_consumer,
1603 &route_id_for_consumer,
1604 &error_msg,
1605 )
1606 .await;
1607 }
1608 });
1609
1610 let pipeline_handle = tokio::spawn(async move {
1612 loop {
1613 tokio::select! {
1614 biased;
1615
1616 late_ex = async {
1617 let mut rx = late_rx.lock().await;
1618 rx.recv().await
1619 } => {
1620 match late_ex {
1621 Some(ex) => {
1622 let pipe = post_pipeline.load();
1623 if let Err(e) = pipe.0.clone().oneshot(ex).await {
1624 tracing::warn!(error = %e, "late exchange post-pipeline failed");
1625 }
1626 }
1627 None => return,
1628 }
1629 }
1630
1631 envelope_opt = rx.recv() => {
1632 match envelope_opt {
1633 Some(envelope) => {
1634 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1635 let pre_pipe = pre_pipeline.load();
1636 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1637 Ok(ex) => ex,
1638 Err(e) => {
1639 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1640 continue;
1641 }
1642 };
1643
1644 let ex = {
1645 let cloned_svc = agg.lock().unwrap().clone();
1646 cloned_svc.oneshot(ex).await
1647 };
1648
1649 match ex {
1650 Ok(ex) => {
1651 if !is_pending(&ex) {
1652 let post_pipe = post_pipeline.load();
1653 let out = post_pipe.0.clone().oneshot(ex).await;
1654 if let Some(tx) = reply_tx { let _ = tx.send(out); }
1655 } else if let Some(tx) = reply_tx {
1656 let _ = tx.send(Ok(ex));
1657 }
1658 }
1659 Err(e) => {
1660 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1661 }
1662 }
1663 }
1664 None => return,
1665 }
1666 }
1667
1668 _ = pipeline_cancel.cancelled() => {
1669 {
1670 let guard = agg.lock().unwrap();
1671 guard.force_complete_all();
1672 }
1673 let mut rx_guard = late_rx.lock().await;
1674 while let Ok(late_ex) = rx_guard.try_recv() {
1675 let pipe = post_pipeline.load();
1676 let _ = pipe.0.clone().oneshot(late_ex).await;
1677 }
1678 break;
1679 }
1680 }
1681 }
1682 });
1683
1684 let managed = self
1685 .routes
1686 .get_mut(route_id)
1687 .expect("invariant: route must exist");
1688 managed.consumer_handle = Some(consumer_handle);
1689 managed.pipeline_handle = Some(pipeline_handle);
1690 managed.channel_sender = Some(tx_for_storage);
1691
1692 info!(route_id = %route_id, "Route started (aggregate with timeout)");
1693 return Ok(());
1694 }
1695 let route_id_for_consumer = route_id.to_string();
1699 let consumer_handle = tokio::spawn(async move {
1700 if let Err(e) = consumer.start(consumer_ctx).await {
1701 error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1702 let error_msg = e.to_string();
1703
1704 if let Some(tx) = crash_notifier {
1706 let _ = tx
1707 .send(CrashNotification {
1708 route_id: route_id_for_consumer.clone(),
1709 error: error_msg.clone(),
1710 })
1711 .await;
1712 }
1713
1714 publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1715 .await;
1716 }
1717 });
1718
1719 let pipeline_handle = match effective_concurrency {
1721 ConcurrencyModel::Sequential => {
1722 tokio::spawn(async move {
1723 loop {
1724 let envelope = tokio::select! {
1726 envelope = rx.recv() => match envelope {
1727 Some(e) => e,
1728 None => return, },
1730 _ = pipeline_cancel.cancelled() => {
1731 return;
1733 }
1734 };
1735 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1736
1737 let mut pipeline = pipeline.load().0.clone();
1739
1740 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1741 if let Some(tx) = reply_tx {
1742 let _ = tx.send(Err(e));
1743 }
1744 return;
1745 }
1746
1747 let result = pipeline.call(exchange).await;
1748 if let Some(tx) = reply_tx {
1749 let _ = tx.send(result);
1750 } else if let Err(ref e) = result
1751 && !matches!(e, CamelError::Stopped)
1752 {
1753 error!("Pipeline error: {e}");
1754 }
1755 }
1756 })
1757 }
1758 ConcurrencyModel::Concurrent { max } => {
1759 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1760 tokio::spawn(async move {
1761 loop {
1762 let envelope = tokio::select! {
1764 envelope = rx.recv() => match envelope {
1765 Some(e) => e,
1766 None => return, },
1768 _ = pipeline_cancel.cancelled() => {
1769 return;
1771 }
1772 };
1773 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1774 let pipe_ref = Arc::clone(&pipeline);
1775 let sem = sem.clone();
1776 let cancel = pipeline_cancel.clone();
1777 tokio::spawn(async move {
1778 let _permit = match &sem {
1780 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1781 None => None,
1782 };
1783
1784 let mut pipe = pipe_ref.load().0.clone();
1786
1787 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1789 if let Some(tx) = reply_tx {
1790 let _ = tx.send(Err(e));
1791 }
1792 return;
1793 }
1794
1795 let result = pipe.call(exchange).await;
1796 if let Some(tx) = reply_tx {
1797 let _ = tx.send(result);
1798 } else if let Err(ref e) = result
1799 && !matches!(e, CamelError::Stopped)
1800 {
1801 error!("Pipeline error: {e}");
1802 }
1803 });
1804 }
1805 })
1806 }
1807 };
1808
1809 let managed = self
1811 .routes
1812 .get_mut(route_id)
1813 .expect("invariant: route must exist after prior existence check");
1814 managed.consumer_handle = Some(consumer_handle);
1815 managed.pipeline_handle = Some(pipeline_handle);
1816 managed.channel_sender = Some(tx_for_storage);
1817
1818 info!(route_id = %route_id, "Route started");
1819 Ok(())
1820 }
1821
1822 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1823 self.stop_route_internal(route_id).await
1824 }
1825
1826 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1827 self.stop_route(route_id).await?;
1828 tokio::time::sleep(Duration::from_millis(100)).await;
1829 self.start_route(route_id).await
1830 }
1831
1832 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1833 let managed = self
1835 .routes
1836 .get_mut(route_id)
1837 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1838
1839 let consumer_running = handle_is_running(&managed.consumer_handle);
1840 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1841
1842 if !consumer_running || !pipeline_running {
1844 return Err(CamelError::RouteError(format!(
1845 "Cannot suspend route '{}' with execution lifecycle {}",
1846 route_id,
1847 inferred_lifecycle_label(managed)
1848 )));
1849 }
1850
1851 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1852
1853 let managed = self
1855 .routes
1856 .get_mut(route_id)
1857 .expect("invariant: route must exist after prior existence check");
1858 managed.consumer_cancel_token.cancel();
1859
1860 let managed = self
1862 .routes
1863 .get_mut(route_id)
1864 .expect("invariant: route must exist after prior existence check");
1865 let consumer_handle = managed.consumer_handle.take();
1866
1867 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1869 if let Some(handle) = consumer_handle {
1870 let _ = handle.await;
1871 }
1872 })
1873 .await;
1874
1875 if timeout_result.is_err() {
1876 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1877 }
1878
1879 let managed = self
1881 .routes
1882 .get_mut(route_id)
1883 .expect("invariant: route must exist after prior existence check");
1884
1885 managed.consumer_cancel_token = CancellationToken::new();
1887
1888 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1889 Ok(())
1890 }
1891
1892 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1893 let managed = self
1895 .routes
1896 .get(route_id)
1897 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1898
1899 let consumer_running = handle_is_running(&managed.consumer_handle);
1900 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1901 if consumer_running || !pipeline_running {
1902 return Err(CamelError::RouteError(format!(
1903 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1904 route_id,
1905 inferred_lifecycle_label(managed)
1906 )));
1907 }
1908
1909 let sender = managed.channel_sender.clone().ok_or_else(|| {
1911 CamelError::RouteError("Suspended route has no channel sender".into())
1912 })?;
1913
1914 let from_uri = managed.from_uri.clone();
1916
1917 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1918
1919 let parsed = parse_uri(&from_uri)?;
1921 let registry = self
1922 .registry
1923 .lock()
1924 .expect("mutex poisoned: another thread panicked while holding this lock");
1925 let component = registry.get_or_err(&parsed.scheme)?;
1926 let endpoint = component.create_endpoint(&from_uri)?;
1927 let mut consumer = endpoint.create_consumer()?;
1928 drop(registry);
1930
1931 let managed = self
1933 .routes
1934 .get_mut(route_id)
1935 .expect("invariant: route must exist after prior existence check");
1936
1937 let consumer_cancel = managed.consumer_cancel_token.child_token();
1939
1940 let crash_notifier = self.crash_notifier.clone();
1941 let runtime_for_consumer = self.runtime.clone();
1942
1943 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1945
1946 let route_id_for_consumer = route_id.to_string();
1948 let consumer_handle = tokio::spawn(async move {
1949 if let Err(e) = consumer.start(consumer_ctx).await {
1950 error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1951 let error_msg = e.to_string();
1952
1953 if let Some(tx) = crash_notifier {
1955 let _ = tx
1956 .send(CrashNotification {
1957 route_id: route_id_for_consumer.clone(),
1958 error: error_msg.clone(),
1959 })
1960 .await;
1961 }
1962
1963 publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1964 .await;
1965 }
1966 });
1967
1968 let managed = self
1970 .routes
1971 .get_mut(route_id)
1972 .expect("invariant: route must exist after prior existence check");
1973 managed.consumer_handle = Some(consumer_handle);
1974
1975 info!(route_id = %route_id, "Route resumed");
1976 Ok(())
1977 }
1978
1979 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1980 let route_ids: Vec<String> = {
1983 let mut pairs: Vec<_> = self
1984 .routes
1985 .iter()
1986 .filter(|(_, r)| r.definition.auto_startup())
1987 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1988 .collect();
1989 pairs.sort_by_key(|(_, order)| *order);
1990 pairs.into_iter().map(|(id, _)| id).collect()
1991 };
1992
1993 info!("Starting {} auto-startup routes", route_ids.len());
1994
1995 let mut errors: Vec<String> = Vec::new();
1997 for route_id in route_ids {
1998 if let Err(e) = self.start_route(&route_id).await {
1999 errors.push(format!("Route '{}': {}", route_id, e));
2000 }
2001 }
2002
2003 if !errors.is_empty() {
2004 return Err(CamelError::RouteError(format!(
2005 "Failed to start routes: {}",
2006 errors.join(", ")
2007 )));
2008 }
2009
2010 info!("All auto-startup routes started");
2011 Ok(())
2012 }
2013
2014 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
2015 let route_ids: Vec<String> = {
2017 let mut pairs: Vec<_> = self
2018 .routes
2019 .iter()
2020 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
2021 .collect();
2022 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
2023 pairs.into_iter().map(|(id, _)| id).collect()
2024 };
2025
2026 info!("Stopping {} routes", route_ids.len());
2027
2028 for route_id in route_ids {
2029 let _ = self.stop_route(&route_id).await;
2030 }
2031
2032 info!("All routes stopped");
2033 Ok(())
2034 }
2035}
2036
2037#[async_trait::async_trait]
2038impl RouteControllerInternal for DefaultRouteController {
2039 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
2040 DefaultRouteController::add_route(self, def)
2041 }
2042
2043 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
2044 DefaultRouteController::swap_pipeline(self, route_id, pipeline)
2045 }
2046
2047 fn route_from_uri(&self, route_id: &str) -> Option<String> {
2048 DefaultRouteController::route_from_uri(self, route_id)
2050 }
2051
2052 fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
2053 DefaultRouteController::set_error_handler(self, config)
2054 }
2055
2056 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
2057 DefaultRouteController::set_self_ref(self, self_ref)
2058 }
2059
2060 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
2061 DefaultRouteController::set_runtime_handle(self, runtime)
2062 }
2063
2064 fn route_count(&self) -> usize {
2065 DefaultRouteController::route_count(self)
2066 }
2067
2068 fn in_flight_count(&self, route_id: &str) -> Option<u64> {
2069 DefaultRouteController::in_flight_count(self, route_id)
2070 }
2071
2072 fn route_exists(&self, route_id: &str) -> bool {
2073 DefaultRouteController::route_exists(self, route_id)
2074 }
2075
2076 fn route_ids(&self) -> Vec<String> {
2077 DefaultRouteController::route_ids(self)
2078 }
2079
2080 fn auto_startup_route_ids(&self) -> Vec<String> {
2081 DefaultRouteController::auto_startup_route_ids(self)
2082 }
2083
2084 fn shutdown_route_ids(&self) -> Vec<String> {
2085 DefaultRouteController::shutdown_route_ids(self)
2086 }
2087
2088 fn set_tracer_config(&mut self, config: &TracerConfig) {
2089 DefaultRouteController::set_tracer_config(self, config)
2090 }
2091
2092 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
2093 DefaultRouteController::compile_route_definition(self, def)
2094 }
2095
2096 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
2097 DefaultRouteController::remove_route(self, route_id)
2098 }
2099
2100 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
2101 DefaultRouteController::start_route(self, route_id).await
2102 }
2103
2104 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
2105 DefaultRouteController::stop_route(self, route_id).await
2106 }
2107}
2108
2109#[cfg(test)]
2110mod tests {
2111 use super::*;
2112 use crate::shared::components::domain::Registry;
2113
2114 fn build_controller() -> DefaultRouteController {
2115 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())))
2116 }
2117
2118 fn build_controller_with_components() -> DefaultRouteController {
2119 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2120 {
2121 let mut guard = registry.lock().expect("registry lock");
2122 guard.register(camel_component_timer::TimerComponent::new());
2123 guard.register(camel_component_mock::MockComponent::new());
2124 guard.register(camel_component_log::LogComponent::new());
2125 }
2126 DefaultRouteController::new(registry)
2127 }
2128
2129 fn set_self_ref(controller: &mut DefaultRouteController) {
2130 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2131 let other: Arc<Mutex<dyn RouteController>> =
2132 Arc::new(Mutex::new(DefaultRouteController::new(registry)));
2133 controller.set_self_ref(other);
2134 }
2135
2136 fn register_simple_language(controller: &mut DefaultRouteController) {
2137 controller.languages.lock().expect("languages lock").insert(
2138 "simple".into(),
2139 Arc::new(camel_language_simple::SimpleLanguage),
2140 );
2141 }
2142
2143 #[test]
2144 fn test_route_controller_internal_is_object_safe() {
2145 let _: Option<Box<dyn RouteControllerInternal>> = None;
2146 }
2147
2148 #[test]
2149 fn helper_functions_cover_non_async_branches() {
2150 let managed = ManagedRoute {
2151 definition: RouteDefinition::new("timer:a", vec![])
2152 .with_route_id("r")
2153 .to_info(),
2154 from_uri: "timer:a".into(),
2155 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(BoxProcessor::new(
2156 IdentityProcessor,
2157 )))),
2158 concurrency: None,
2159 consumer_handle: None,
2160 pipeline_handle: None,
2161 consumer_cancel_token: CancellationToken::new(),
2162 pipeline_cancel_token: CancellationToken::new(),
2163 channel_sender: None,
2164 in_flight: None,
2165 aggregate_split: None,
2166 agg_service: None,
2167 };
2168
2169 assert_eq!(inferred_lifecycle_label(&managed), "Stopped");
2170 assert!(!handle_is_running(&managed.consumer_handle));
2171
2172 let cmd = runtime_failure_command("route-x", "boom");
2173 match cmd {
2174 RuntimeCommand::FailRoute {
2175 route_id, error, ..
2176 } => {
2177 assert_eq!(route_id, "route-x");
2178 assert_eq!(error, "boom");
2179 }
2180 _ => panic!("expected FailRoute command"),
2181 }
2182 }
2183
2184 #[test]
2185 fn add_route_detects_duplicates() {
2186 let mut controller = build_controller();
2187 set_self_ref(&mut controller);
2188
2189 controller
2190 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
2191 .expect("add route");
2192
2193 let dup_err = controller
2194 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
2195 .expect_err("duplicate must fail");
2196 assert!(dup_err.to_string().contains("already exists"));
2197 }
2198
2199 #[test]
2200 fn route_introspection_and_ordering_helpers_work() {
2201 let mut controller = build_controller();
2202 set_self_ref(&mut controller);
2203
2204 controller
2205 .add_route(
2206 RouteDefinition::new("timer:a", vec![])
2207 .with_route_id("a")
2208 .with_startup_order(20),
2209 )
2210 .unwrap();
2211 controller
2212 .add_route(
2213 RouteDefinition::new("timer:b", vec![])
2214 .with_route_id("b")
2215 .with_startup_order(10),
2216 )
2217 .unwrap();
2218 controller
2219 .add_route(
2220 RouteDefinition::new("timer:c", vec![])
2221 .with_route_id("c")
2222 .with_auto_startup(false)
2223 .with_startup_order(5),
2224 )
2225 .unwrap();
2226
2227 assert_eq!(controller.route_count(), 3);
2228 assert_eq!(controller.route_from_uri("a"), Some("timer:a".into()));
2229 assert!(controller.route_ids().contains(&"a".to_string()));
2230 assert_eq!(
2231 controller.auto_startup_route_ids(),
2232 vec!["b".to_string(), "a".to_string()]
2233 );
2234 assert_eq!(
2235 controller.shutdown_route_ids(),
2236 vec!["a".to_string(), "b".to_string(), "c".to_string()]
2237 );
2238 }
2239
2240 #[test]
2241 fn swap_pipeline_and_remove_route_behaviors() {
2242 let mut controller = build_controller();
2243 set_self_ref(&mut controller);
2244
2245 controller
2246 .add_route(RouteDefinition::new("timer:a", vec![]).with_route_id("swap"))
2247 .unwrap();
2248
2249 controller
2250 .swap_pipeline("swap", BoxProcessor::new(IdentityProcessor))
2251 .unwrap();
2252 assert!(controller.get_pipeline("swap").is_some());
2253
2254 controller.remove_route("swap").unwrap();
2255 assert_eq!(controller.route_count(), 0);
2256
2257 let err = controller
2258 .remove_route("swap")
2259 .expect_err("missing route must fail");
2260 assert!(err.to_string().contains("not found"));
2261 }
2262
2263 #[test]
2264 fn resolve_steps_covers_declarative_and_eip_variants() {
2265 use camel_api::LanguageExpressionDef;
2266 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2267
2268 let mut controller = build_controller_with_components();
2269 set_self_ref(&mut controller);
2270 register_simple_language(&mut controller);
2271
2272 let expr = |source: &str| LanguageExpressionDef {
2273 language: "simple".into(),
2274 source: source.into(),
2275 };
2276
2277 let steps = vec![
2278 BuilderStep::To("mock:out".into()),
2279 BuilderStep::Stop,
2280 BuilderStep::Log {
2281 level: camel_processor::LogLevel::Info,
2282 message: "log".into(),
2283 },
2284 BuilderStep::DeclarativeSetHeader {
2285 key: "k".into(),
2286 value: ValueSourceDef::Literal(Value::String("v".into())),
2287 },
2288 BuilderStep::DeclarativeSetHeader {
2289 key: "k2".into(),
2290 value: ValueSourceDef::Expression(expr("${body}")),
2291 },
2292 BuilderStep::DeclarativeSetBody {
2293 value: ValueSourceDef::Expression(expr("${body}")),
2294 },
2295 BuilderStep::DeclarativeFilter {
2296 predicate: expr("${body} != null"),
2297 steps: vec![BuilderStep::Stop],
2298 },
2299 BuilderStep::DeclarativeChoice {
2300 whens: vec![
2301 crate::lifecycle::application::route_definition::DeclarativeWhenStep {
2302 predicate: expr("${body} == 'x'"),
2303 steps: vec![BuilderStep::Stop],
2304 },
2305 ],
2306 otherwise: Some(vec![BuilderStep::Stop]),
2307 },
2308 BuilderStep::DeclarativeScript {
2309 expression: expr("${body}"),
2310 },
2311 BuilderStep::Split {
2312 config: SplitterConfig::new(split_body_lines())
2313 .aggregation(AggregationStrategy::CollectAll),
2314 steps: vec![BuilderStep::Stop],
2315 },
2316 BuilderStep::DeclarativeSplit {
2317 expression: expr("${body}"),
2318 aggregation: AggregationStrategy::Original,
2319 parallel: false,
2320 parallel_limit: Some(2),
2321 stop_on_exception: true,
2322 steps: vec![BuilderStep::Stop],
2323 },
2324 BuilderStep::Aggregate {
2325 config: camel_api::AggregatorConfig::correlate_by("id")
2326 .complete_when_size(1)
2327 .build(),
2328 },
2329 BuilderStep::Filter {
2330 predicate: Arc::new(|_| true),
2331 steps: vec![BuilderStep::Stop],
2332 },
2333 BuilderStep::Choice {
2334 whens: vec![crate::lifecycle::application::route_definition::WhenStep {
2335 predicate: Arc::new(|_| true),
2336 steps: vec![BuilderStep::Stop],
2337 }],
2338 otherwise: Some(vec![BuilderStep::Stop]),
2339 },
2340 BuilderStep::WireTap {
2341 uri: "mock:tap".into(),
2342 },
2343 BuilderStep::Multicast {
2344 steps: vec![
2345 BuilderStep::To("mock:m1".into()),
2346 BuilderStep::To("mock:m2".into()),
2347 ],
2348 config: camel_api::MulticastConfig::new(),
2349 },
2350 BuilderStep::DeclarativeLog {
2351 level: camel_processor::LogLevel::Info,
2352 message: ValueSourceDef::Expression(expr("${body}")),
2353 },
2354 BuilderStep::Throttle {
2355 config: camel_api::ThrottlerConfig::new(10, Duration::from_millis(100)),
2356 steps: vec![BuilderStep::To("mock:t".into())],
2357 },
2358 BuilderStep::LoadBalance {
2359 config: camel_api::LoadBalancerConfig::round_robin(),
2360 steps: vec![
2361 BuilderStep::To("mock:l1".into()),
2362 BuilderStep::To("mock:l2".into()),
2363 ],
2364 },
2365 BuilderStep::DynamicRouter {
2366 config: camel_api::DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
2367 },
2368 BuilderStep::RoutingSlip {
2369 config: camel_api::RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
2370 },
2371 ];
2372
2373 let producer_ctx = ProducerContext::new();
2374 let resolved = controller
2375 .resolve_steps(steps, &producer_ctx, &controller.registry)
2376 .expect("resolve should succeed");
2377 assert!(!resolved.is_empty());
2378 }
2379
2380 #[test]
2381 fn resolve_steps_script_requires_mutating_language_support() {
2382 use camel_api::LanguageExpressionDef;
2383
2384 let mut controller = build_controller_with_components();
2385 set_self_ref(&mut controller);
2386 register_simple_language(&mut controller);
2387
2388 let steps = vec![BuilderStep::Script {
2389 language: "simple".into(),
2390 script: "${body}".into(),
2391 }];
2392
2393 let err = controller
2394 .resolve_steps(steps, &ProducerContext::new(), &controller.registry)
2395 .expect_err("simple script should fail for mutating expression");
2396 assert!(err.to_string().contains("does not support"));
2397
2398 let bean_missing = vec![BuilderStep::Bean {
2399 name: "unknown".into(),
2400 method: "run".into(),
2401 }];
2402 let bean_err = controller
2403 .resolve_steps(bean_missing, &ProducerContext::new(), &controller.registry)
2404 .expect_err("missing bean must fail");
2405 assert!(bean_err.to_string().contains("Bean not found"));
2406
2407 let bad_declarative = vec![BuilderStep::DeclarativeScript {
2408 expression: LanguageExpressionDef {
2409 language: "unknown".into(),
2410 source: "x".into(),
2411 },
2412 }];
2413 let lang_err = controller
2414 .resolve_steps(
2415 bad_declarative,
2416 &ProducerContext::new(),
2417 &controller.registry,
2418 )
2419 .expect_err("unknown language must fail");
2420 assert!(lang_err.to_string().contains("not registered"));
2421 }
2422
2423 #[tokio::test]
2424 async fn lifecycle_methods_report_missing_routes() {
2425 let mut controller = build_controller();
2426
2427 assert!(controller.start_route("missing").await.is_err());
2428 assert!(controller.stop_route("missing").await.is_err());
2429 assert!(controller.suspend_route("missing").await.is_err());
2430 assert!(controller.resume_route("missing").await.is_err());
2431 }
2432
2433 #[tokio::test]
2434 async fn start_stop_route_happy_path_with_timer_and_mock() {
2435 let mut controller = build_controller_with_components();
2436 set_self_ref(&mut controller);
2437
2438 let route = RouteDefinition::new(
2439 "timer:tick?period=10&repeatCount=1",
2440 vec![BuilderStep::To("mock:out".into())],
2441 )
2442 .with_route_id("rt-1");
2443 controller.add_route(route).unwrap();
2444
2445 controller.start_route("rt-1").await.unwrap();
2446 tokio::time::sleep(Duration::from_millis(40)).await;
2447 controller.stop_route("rt-1").await.unwrap();
2448
2449 controller.remove_route("rt-1").unwrap();
2450 }
2451
2452 #[tokio::test]
2453 async fn suspend_resume_and_restart_cover_execution_transitions() {
2454 let mut controller = build_controller_with_components();
2455 set_self_ref(&mut controller);
2456
2457 let route = RouteDefinition::new(
2458 "timer:tick?period=30",
2459 vec![BuilderStep::To("mock:out".into())],
2460 )
2461 .with_route_id("rt-2");
2462 controller.add_route(route).unwrap();
2463
2464 controller.start_route("rt-2").await.unwrap();
2465 controller.suspend_route("rt-2").await.unwrap();
2466 controller.resume_route("rt-2").await.unwrap();
2467 controller.restart_route("rt-2").await.unwrap();
2468 controller.stop_route("rt-2").await.unwrap();
2469 }
2470
2471 #[tokio::test]
2472 async fn remove_route_rejects_running_route() {
2473 let mut controller = build_controller_with_components();
2474 set_self_ref(&mut controller);
2475
2476 let route = RouteDefinition::new(
2477 "timer:tick?period=25",
2478 vec![BuilderStep::To("mock:out".into())],
2479 )
2480 .with_route_id("rt-running");
2481 controller.add_route(route).unwrap();
2482 controller.start_route("rt-running").await.unwrap();
2483
2484 let err = controller
2485 .remove_route("rt-running")
2486 .expect_err("running route removal must fail");
2487 assert!(err.to_string().contains("must be stopped before removal"));
2488
2489 controller.stop_route("rt-running").await.unwrap();
2490 controller.remove_route("rt-running").unwrap();
2491 }
2492
2493 #[tokio::test]
2494 async fn start_route_on_suspended_state_returns_guidance_error() {
2495 let mut controller = build_controller_with_components();
2496 set_self_ref(&mut controller);
2497
2498 let route = RouteDefinition::new(
2499 "timer:tick?period=40",
2500 vec![BuilderStep::To("mock:out".into())],
2501 )
2502 .with_route_id("rt-suspend");
2503 controller.add_route(route).unwrap();
2504
2505 controller.start_route("rt-suspend").await.unwrap();
2506 controller.suspend_route("rt-suspend").await.unwrap();
2507
2508 let err = controller
2509 .start_route("rt-suspend")
2510 .await
2511 .expect_err("start from suspended must fail");
2512 assert!(err.to_string().contains("use resume_route"));
2513
2514 controller.resume_route("rt-suspend").await.unwrap();
2515 controller.stop_route("rt-suspend").await.unwrap();
2516 }
2517
2518 #[tokio::test]
2519 async fn suspend_and_resume_validate_execution_state() {
2520 let mut controller = build_controller_with_components();
2521 set_self_ref(&mut controller);
2522
2523 controller
2524 .add_route(
2525 RouteDefinition::new("timer:tick?period=50", vec![]).with_route_id("rt-state"),
2526 )
2527 .unwrap();
2528
2529 let suspend_err = controller
2530 .suspend_route("rt-state")
2531 .await
2532 .expect_err("suspend before start must fail");
2533 assert!(suspend_err.to_string().contains("Cannot suspend route"));
2534
2535 controller.start_route("rt-state").await.unwrap();
2536 let resume_err = controller
2537 .resume_route("rt-state")
2538 .await
2539 .expect_err("resume while started must fail");
2540 assert!(resume_err.to_string().contains("Cannot resume route"));
2541
2542 controller.stop_route("rt-state").await.unwrap();
2543 }
2544
2545 #[tokio::test]
2546 async fn concurrent_concurrency_override_path_executes() {
2547 let mut controller = build_controller_with_components();
2548 set_self_ref(&mut controller);
2549
2550 let route = RouteDefinition::new(
2551 "timer:tick?period=10&repeatCount=2",
2552 vec![BuilderStep::To("mock:out".into())],
2553 )
2554 .with_route_id("rt-concurrent")
2555 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(2) });
2556
2557 controller.add_route(route).unwrap();
2558 controller.start_route("rt-concurrent").await.unwrap();
2559 tokio::time::sleep(Duration::from_millis(50)).await;
2560 controller.stop_route("rt-concurrent").await.unwrap();
2561 }
2562
2563 #[tokio::test]
2564 async fn add_route_with_circuit_breaker_and_error_handler_compiles() {
2565 use camel_api::circuit_breaker::CircuitBreakerConfig;
2566 use camel_api::error_handler::ErrorHandlerConfig;
2567
2568 let mut controller = build_controller_with_components();
2569 set_self_ref(&mut controller);
2570
2571 let route = RouteDefinition::new("timer:tick?period=25", vec![BuilderStep::Stop])
2572 .with_route_id("rt-eh")
2573 .with_circuit_breaker(CircuitBreakerConfig::new())
2574 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"));
2575
2576 controller
2577 .add_route(route)
2578 .expect("route with layers should compile");
2579 controller.start_route("rt-eh").await.unwrap();
2580 controller.stop_route("rt-eh").await.unwrap();
2581 }
2582
2583 #[tokio::test]
2584 async fn compile_and_swap_errors_for_missing_route() {
2585 let mut controller = build_controller_with_components();
2586 set_self_ref(&mut controller);
2587
2588 let compiled = controller
2589 .compile_route_definition(
2590 RouteDefinition::new("timer:tick?period=10", vec![BuilderStep::Stop])
2591 .with_route_id("compiled"),
2592 )
2593 .expect("compile should work");
2594
2595 let err = controller
2596 .swap_pipeline("nope", compiled)
2597 .expect_err("missing route swap must fail");
2598 assert!(err.to_string().contains("not found"));
2599 }
2600}