1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21 BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
22 RouteController, RuntimeCommand, RuntimeHandle, Value, body::Body,
23};
24use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
25use camel_endpoint::parse_uri;
26use camel_language_api::{Expression, Language, LanguageError, Predicate};
27pub use camel_processor::aggregator::SharedLanguageRegistry;
28use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
29use camel_processor::circuit_breaker::CircuitBreakerLayer;
30use camel_processor::error_handler::ErrorHandlerLayer;
31use camel_processor::script_mutator::ScriptMutator;
32use camel_processor::{ChoiceService, WhenClause};
33
34use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
35use crate::lifecycle::adapters::route_compiler::{
36 compose_pipeline, compose_traced_pipeline_with_contracts,
37};
38use crate::lifecycle::application::route_definition::{
39 BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
40};
41use crate::shared::components::domain::Registry;
42use crate::shared::observability::domain::{DetailLevel, TracerConfig};
43use arc_swap::ArcSwap;
44use camel_bean::BeanRegistry;
45
46#[derive(Debug, Clone)]
51pub struct CrashNotification {
52 pub route_id: String,
54 pub error: String,
56}
57
58pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
73unsafe impl Sync for SyncBoxProcessor {}
74
75type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
76
77#[async_trait::async_trait]
83pub trait RouteControllerInternal: RouteController + Send {
84 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
86
87 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
89
90 fn route_from_uri(&self, route_id: &str) -> Option<String>;
92
93 fn set_error_handler(&mut self, config: ErrorHandlerConfig);
95
96 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
98
99 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>);
101
102 fn route_count(&self) -> usize;
104
105 fn in_flight_count(&self, route_id: &str) -> Option<u64>;
107
108 fn route_exists(&self, route_id: &str) -> bool;
110
111 fn route_ids(&self) -> Vec<String>;
113
114 fn auto_startup_route_ids(&self) -> Vec<String>;
116
117 fn shutdown_route_ids(&self) -> Vec<String>;
119
120 fn set_tracer_config(&mut self, config: &TracerConfig);
122
123 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
126
127 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
129
130 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
132
133 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
135}
136
137struct AggregateSplitInfo {
139 pre_pipeline: SharedPipeline,
140 agg_config: AggregatorConfig,
141 post_pipeline: SharedPipeline,
142}
143
144struct ManagedRoute {
145 definition: RouteDefinitionInfo,
147 from_uri: String,
149 pipeline: SharedPipeline,
151 concurrency: Option<ConcurrencyModel>,
153 consumer_handle: Option<JoinHandle<()>>,
155 pipeline_handle: Option<JoinHandle<()>>,
157 consumer_cancel_token: CancellationToken,
160 pipeline_cancel_token: CancellationToken,
163 channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
166 in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
168 aggregate_split: Option<AggregateSplitInfo>,
169 agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
170}
171
172fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
173 handle.as_ref().is_some_and(|h| !h.is_finished())
174}
175
176fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
177 match (
178 handle_is_running(&managed.consumer_handle),
179 handle_is_running(&managed.pipeline_handle),
180 ) {
181 (true, true) => "Started",
182 (false, true) => "Suspended",
183 (true, false) => "Stopping",
184 (false, false) => "Stopped",
185 }
186}
187
188fn find_top_level_aggregate_with_timeout(
189 steps: &[BuilderStep],
190) -> Option<(usize, AggregatorConfig)> {
191 for (i, step) in steps.iter().enumerate() {
192 if let BuilderStep::Aggregate { config } = step {
193 if has_timeout_condition(&config.completion) {
194 return Some((i, config.clone()));
195 }
196 break;
197 }
198 }
199 None
200}
201
202fn is_pending(ex: &Exchange) -> bool {
203 ex.property("CamelAggregatorPending")
204 .and_then(|v| v.as_bool())
205 .unwrap_or(false)
206}
207
208async fn ready_with_backoff(
215 pipeline: &mut BoxProcessor,
216 cancel: &CancellationToken,
217) -> Result<(), CamelError> {
218 loop {
219 match pipeline.ready().await {
220 Ok(_) => return Ok(()),
221 Err(CamelError::CircuitOpen(ref msg)) => {
222 warn!("Circuit open, backing off: {msg}");
223 tokio::select! {
224 _ = tokio::time::sleep(Duration::from_secs(1)) => {
225 continue;
226 }
227 _ = cancel.cancelled() => {
228 return Err(CamelError::CircuitOpen(msg.clone()));
230 }
231 }
232 }
233 Err(e) => {
234 error!("Pipeline not ready: {e}");
235 return Err(e);
236 }
237 }
238 }
239}
240
241fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
242 let stamp = std::time::SystemTime::now()
243 .duration_since(std::time::UNIX_EPOCH)
244 .unwrap_or_default()
245 .as_nanos();
246 RuntimeCommand::FailRoute {
247 route_id: route_id.to_string(),
248 error: error.to_string(),
249 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
250 causation_id: None,
251 }
252}
253
254async fn publish_runtime_failure(
255 runtime: Option<Weak<dyn RuntimeHandle>>,
256 route_id: &str,
257 error: &str,
258) {
259 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
260 return;
261 };
262 let command = runtime_failure_command(route_id, error);
263 if let Err(runtime_error) = runtime.execute(command).await {
264 warn!(
265 route_id = %route_id,
266 error = %runtime_error,
267 "failed to synchronize route crash with runtime projection"
268 );
269 }
270}
271
272pub struct DefaultRouteController {
280 routes: HashMap<String, ManagedRoute>,
282 registry: Arc<std::sync::Mutex<Registry>>,
284 languages: SharedLanguageRegistry,
286 beans: Arc<std::sync::Mutex<BeanRegistry>>,
288 self_ref: Option<Arc<Mutex<dyn RouteController>>>,
291 runtime: Option<Weak<dyn RuntimeHandle>>,
293 global_error_handler: Option<ErrorHandlerConfig>,
295 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
297 tracing_enabled: bool,
299 tracer_detail_level: DetailLevel,
301 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
303}
304
305impl DefaultRouteController {
306 pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
308 Self::with_beans(
309 registry,
310 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
311 )
312 }
313
314 pub fn with_beans(
316 registry: Arc<std::sync::Mutex<Registry>>,
317 beans: Arc<std::sync::Mutex<BeanRegistry>>,
318 ) -> Self {
319 Self {
320 routes: HashMap::new(),
321 registry,
322 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
323 beans,
324 self_ref: None,
325 runtime: None,
326 global_error_handler: None,
327 crash_notifier: None,
328 tracing_enabled: false,
329 tracer_detail_level: DetailLevel::Minimal,
330 tracer_metrics: None,
331 }
332 }
333
334 pub fn with_languages(
336 registry: Arc<std::sync::Mutex<Registry>>,
337 languages: SharedLanguageRegistry,
338 ) -> Self {
339 Self {
340 routes: HashMap::new(),
341 registry,
342 languages,
343 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
344 self_ref: None,
345 runtime: None,
346 global_error_handler: None,
347 crash_notifier: None,
348 tracing_enabled: false,
349 tracer_detail_level: DetailLevel::Minimal,
350 tracer_metrics: None,
351 }
352 }
353
354 pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
358 self.self_ref = Some(self_ref);
359 }
360
361 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
363 self.runtime = Some(Arc::downgrade(&runtime));
364 }
365
366 pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
371 self.self_ref.clone()
372 }
373
374 pub fn runtime_handle_for_supervision(&self) -> Option<Arc<dyn RuntimeHandle>> {
376 self.runtime.as_ref().and_then(Weak::upgrade)
377 }
378
379 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
384 self.crash_notifier = Some(tx);
385 }
386
387 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
389 self.global_error_handler = Some(config);
390 }
391
392 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
394 self.tracing_enabled = config.enabled;
395 self.tracer_detail_level = config.detail_level.clone();
396 self.tracer_metrics = config.metrics_collector.clone();
397 }
398
399 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
400 let mut producer_ctx = ProducerContext::new();
401 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
402 producer_ctx = producer_ctx.with_runtime(runtime);
403 }
404 Ok(producer_ctx)
405 }
406
407 fn resolve_error_handler(
409 &self,
410 config: ErrorHandlerConfig,
411 producer_ctx: &ProducerContext,
412 registry: &Registry,
413 ) -> Result<ErrorHandlerLayer, CamelError> {
414 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
416 let parsed = parse_uri(uri)?;
417 let component = registry.get_or_err(&parsed.scheme)?;
418 let endpoint = component.create_endpoint(uri)?;
419 Some(endpoint.create_producer(producer_ctx)?)
420 } else {
421 None
422 };
423
424 let mut resolved_policies = Vec::new();
426 for policy in config.policies {
427 let handler_producer = if let Some(ref uri) = policy.handled_by {
428 let parsed = parse_uri(uri)?;
429 let component = registry.get_or_err(&parsed.scheme)?;
430 let endpoint = component.create_endpoint(uri)?;
431 Some(endpoint.create_producer(producer_ctx)?)
432 } else {
433 None
434 };
435 resolved_policies.push((policy, handler_producer));
436 }
437
438 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
439 }
440
441 fn resolve_uow_layer(
444 &self,
445 config: &UnitOfWorkConfig,
446 producer_ctx: &ProducerContext,
447 registry: &Registry,
448 counter: Option<Arc<AtomicU64>>,
449 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
450 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
451 let parsed = parse_uri(uri)?;
452 let component = registry.get_or_err(&parsed.scheme)?;
453 let endpoint = component.create_endpoint(uri)?;
454 endpoint.create_producer(producer_ctx).map_err(|e| {
455 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
456 })
457 };
458
459 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
460 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
461
462 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
463 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
464 Ok((layer, counter))
465 }
466
467 fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
468 let guard = self
469 .languages
470 .lock()
471 .expect("mutex poisoned: another thread panicked while holding this lock");
472 guard.get(language).cloned().ok_or_else(|| {
473 CamelError::RouteError(format!(
474 "language `{language}` is not registered in CamelContext"
475 ))
476 })
477 }
478
479 fn compile_language_expression(
480 &self,
481 expression: &LanguageExpressionDef,
482 ) -> Result<Arc<dyn Expression>, CamelError> {
483 let language = self.resolve_language(&expression.language)?;
484 let compiled = language
485 .create_expression(&expression.source)
486 .map_err(|e| {
487 CamelError::RouteError(format!(
488 "failed to compile {} expression `{}`: {e}",
489 expression.language, expression.source
490 ))
491 })?;
492 Ok(Arc::from(compiled))
493 }
494
495 fn compile_language_predicate(
496 &self,
497 expression: &LanguageExpressionDef,
498 ) -> Result<Arc<dyn Predicate>, CamelError> {
499 let language = self.resolve_language(&expression.language)?;
500 let compiled = language.create_predicate(&expression.source).map_err(|e| {
501 CamelError::RouteError(format!(
502 "failed to compile {} predicate `{}`: {e}",
503 expression.language, expression.source
504 ))
505 })?;
506 Ok(Arc::from(compiled))
507 }
508
509 fn compile_filter_predicate(
510 &self,
511 expression: &LanguageExpressionDef,
512 ) -> Result<FilterPredicate, CamelError> {
513 let predicate = self.compile_language_predicate(expression)?;
514 Ok(Arc::new(move |exchange: &Exchange| {
515 predicate.matches(exchange).unwrap_or(false)
516 }))
517 }
518
519 fn value_to_body(value: Value) -> Body {
520 match value {
521 Value::Null => Body::Empty,
522 Value::String(text) => Body::Text(text),
523 other => Body::Json(other),
524 }
525 }
526
527 pub(crate) fn resolve_steps(
529 &self,
530 steps: Vec<BuilderStep>,
531 producer_ctx: &ProducerContext,
532 registry: &Arc<std::sync::Mutex<Registry>>,
533 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
534 let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
535 let parsed = parse_uri(uri)?;
536 let registry_guard = registry
537 .lock()
538 .expect("mutex poisoned: another thread panicked while holding this lock");
539 let component = registry_guard.get_or_err(&parsed.scheme)?;
540 let endpoint = component.create_endpoint(uri)?;
541 endpoint.create_producer(producer_ctx)
542 };
543
544 let mut processors: Vec<(BoxProcessor, Option<camel_api::BodyType>)> = Vec::new();
545 for step in steps {
546 match step {
547 BuilderStep::Processor(svc) => {
548 processors.push((svc, None));
549 }
550 BuilderStep::To(uri) => {
551 let parsed = parse_uri(&uri)?;
552 let registry_guard = registry
553 .lock()
554 .expect("mutex poisoned: another thread panicked while holding this lock");
555 let component = registry_guard.get_or_err(&parsed.scheme)?;
556 let endpoint = component.create_endpoint(&uri)?;
557 let contract = endpoint.body_contract();
558 let producer = endpoint.create_producer(producer_ctx)?;
559 processors.push((producer, contract));
560 }
561 BuilderStep::Stop => {
562 processors.push((BoxProcessor::new(camel_processor::StopService), None));
563 }
564 BuilderStep::Log { level, message } => {
565 let svc = camel_processor::LogProcessor::new(level, message);
566 processors.push((BoxProcessor::new(svc), None));
567 }
568 BuilderStep::DeclarativeSetHeader { key, value } => match value {
569 ValueSourceDef::Literal(value) => {
570 let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
571 processors.push((BoxProcessor::new(svc), None));
572 }
573 ValueSourceDef::Expression(expression) => {
574 let expression = self.compile_language_expression(&expression)?;
575 let svc = camel_processor::DynamicSetHeader::new(
576 IdentityProcessor,
577 key,
578 move |exchange: &Exchange| {
579 expression.evaluate(exchange).unwrap_or(Value::Null)
580 },
581 );
582 processors.push((BoxProcessor::new(svc), None));
583 }
584 },
585 BuilderStep::DeclarativeSetBody { value } => match value {
586 ValueSourceDef::Literal(value) => {
587 let body = Self::value_to_body(value);
588 let svc = camel_processor::SetBody::new(
589 IdentityProcessor,
590 move |_exchange: &Exchange| body.clone(),
591 );
592 processors.push((BoxProcessor::new(svc), None));
593 }
594 ValueSourceDef::Expression(expression) => {
595 let expression = self.compile_language_expression(&expression)?;
596 let svc = camel_processor::SetBody::new(
597 IdentityProcessor,
598 move |exchange: &Exchange| {
599 let value = expression.evaluate(exchange).unwrap_or(Value::Null);
600 Self::value_to_body(value)
601 },
602 );
603 processors.push((BoxProcessor::new(svc), None));
604 }
605 },
606 BuilderStep::DeclarativeFilter { predicate, steps } => {
607 let predicate = self.compile_filter_predicate(&predicate)?;
608 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
609 let sub_processors: Vec<BoxProcessor> =
610 sub_pairs.into_iter().map(|(p, _)| p).collect();
611 let sub_pipeline = compose_pipeline(sub_processors);
612 let svc =
613 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
614 processors.push((BoxProcessor::new(svc), None));
615 }
616 BuilderStep::DeclarativeChoice { whens, otherwise } => {
617 let mut when_clauses = Vec::new();
618 for when_step in whens {
619 let predicate = self.compile_filter_predicate(&when_step.predicate)?;
620 let sub_pairs =
621 self.resolve_steps(when_step.steps, producer_ctx, registry)?;
622 let sub_processors: Vec<BoxProcessor> =
623 sub_pairs.into_iter().map(|(p, _)| p).collect();
624 let pipeline = compose_pipeline(sub_processors);
625 when_clauses.push(WhenClause {
626 predicate,
627 pipeline,
628 });
629 }
630 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
631 let sub_pairs =
632 self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
633 let sub_processors: Vec<BoxProcessor> =
634 sub_pairs.into_iter().map(|(p, _)| p).collect();
635 Some(compose_pipeline(sub_processors))
636 } else {
637 None
638 };
639 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
640 processors.push((BoxProcessor::new(svc), None));
641 }
642 BuilderStep::DeclarativeScript { expression } => {
643 let lang = self.resolve_language(&expression.language)?;
644 match lang.create_mutating_expression(&expression.source) {
645 Ok(mut_expr) => {
646 processors
647 .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
648 }
649 Err(LanguageError::NotSupported { .. }) => {
650 let expression = self.compile_language_expression(&expression)?;
658 let svc = camel_processor::SetBody::new(
659 IdentityProcessor,
660 move |exchange: &Exchange| {
661 let value =
662 expression.evaluate(exchange).unwrap_or(Value::Null);
663 Self::value_to_body(value)
664 },
665 );
666 processors.push((BoxProcessor::new(svc), None));
667 }
668 Err(e) => {
669 return Err(CamelError::RouteError(format!(
670 "Failed to create mutating expression for language '{}': {}",
671 expression.language, e
672 )));
673 }
674 }
675 }
676 BuilderStep::Split { config, steps } => {
677 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
678 let sub_processors: Vec<BoxProcessor> =
679 sub_pairs.into_iter().map(|(p, _)| p).collect();
680 let sub_pipeline = compose_pipeline(sub_processors);
681 let splitter =
682 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
683 processors.push((BoxProcessor::new(splitter), None));
684 }
685 BuilderStep::DeclarativeSplit {
686 expression,
687 aggregation,
688 parallel,
689 parallel_limit,
690 stop_on_exception,
691 steps,
692 } => {
693 let lang_expr = self.compile_language_expression(&expression)?;
694 let split_fn = move |exchange: &Exchange| {
695 let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
696 match value {
697 Value::String(s) => s
698 .lines()
699 .filter(|line| !line.is_empty())
700 .map(|line| {
701 let mut fragment = exchange.clone();
702 fragment.input.body = Body::from(line.to_string());
703 fragment
704 })
705 .collect(),
706 Value::Array(arr) => arr
707 .into_iter()
708 .map(|v| {
709 let mut fragment = exchange.clone();
710 fragment.input.body = Body::from(v);
711 fragment
712 })
713 .collect(),
714 _ => vec![exchange.clone()],
715 }
716 };
717
718 let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
719 .aggregation(aggregation)
720 .parallel(parallel)
721 .stop_on_exception(stop_on_exception);
722 if let Some(limit) = parallel_limit {
723 config = config.parallel_limit(limit);
724 }
725
726 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
727 let sub_processors: Vec<BoxProcessor> =
728 sub_pairs.into_iter().map(|(p, _)| p).collect();
729 let sub_pipeline = compose_pipeline(sub_processors);
730 let splitter =
731 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
732 processors.push((BoxProcessor::new(splitter), None));
733 }
734 BuilderStep::Aggregate { config } => {
735 let (late_tx, _late_rx) = mpsc::channel(256);
736 let registry: SharedLanguageRegistry =
737 Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
738 let cancel = CancellationToken::new();
739 let svc =
740 camel_processor::AggregatorService::new(config, late_tx, registry, cancel);
741 processors.push((BoxProcessor::new(svc), None));
742 }
743 BuilderStep::Filter { predicate, steps } => {
744 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
745 let sub_processors: Vec<BoxProcessor> =
746 sub_pairs.into_iter().map(|(p, _)| p).collect();
747 let sub_pipeline = compose_pipeline(sub_processors);
748 let svc =
749 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
750 processors.push((BoxProcessor::new(svc), None));
751 }
752 BuilderStep::Choice { whens, otherwise } => {
753 let mut when_clauses = Vec::new();
755 for when_step in whens {
756 let sub_pairs =
757 self.resolve_steps(when_step.steps, producer_ctx, registry)?;
758 let sub_processors: Vec<BoxProcessor> =
759 sub_pairs.into_iter().map(|(p, _)| p).collect();
760 let pipeline = compose_pipeline(sub_processors);
761 when_clauses.push(WhenClause {
762 predicate: when_step.predicate,
763 pipeline,
764 });
765 }
766 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
768 let sub_pairs =
769 self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
770 let sub_processors: Vec<BoxProcessor> =
771 sub_pairs.into_iter().map(|(p, _)| p).collect();
772 Some(compose_pipeline(sub_processors))
773 } else {
774 None
775 };
776 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
777 processors.push((BoxProcessor::new(svc), None));
778 }
779 BuilderStep::WireTap { uri } => {
780 let producer = resolve_producer(&uri)?;
781 let svc = camel_processor::WireTapService::new(producer);
782 processors.push((BoxProcessor::new(svc), None));
783 }
784 BuilderStep::Multicast { config, steps } => {
785 let mut endpoints = Vec::new();
787 for step in steps {
788 let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
789 let sub_processors: Vec<BoxProcessor> =
790 sub_pairs.into_iter().map(|(p, _)| p).collect();
791 let endpoint = compose_pipeline(sub_processors);
792 endpoints.push(endpoint);
793 }
794 let svc = camel_processor::MulticastService::new(endpoints, config);
795 processors.push((BoxProcessor::new(svc), None));
796 }
797 BuilderStep::DeclarativeLog { level, message } => {
798 let ValueSourceDef::Expression(expression) = message else {
799 unreachable!(
802 "DeclarativeLog with Literal should have been compiled to a Processor"
803 );
804 };
805 let expression = self.compile_language_expression(&expression)?;
806 let svc =
807 camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
808 expression
809 .evaluate(exchange)
810 .unwrap_or_else(|e| {
811 warn!(error = %e, "log expression evaluation failed");
812 Value::Null
813 })
814 .to_string()
815 });
816 processors.push((BoxProcessor::new(svc), None));
817 }
818 BuilderStep::Bean { name, method } => {
819 let beans = self.beans.lock().expect(
821 "beans mutex poisoned: another thread panicked while holding this lock",
822 );
823
824 let bean = beans.get(&name).ok_or_else(|| {
826 CamelError::ProcessorError(format!("Bean not found: {}", name))
827 })?;
828
829 let bean_clone = Arc::clone(&bean);
831 let method = method.clone();
832
833 let processor = tower::service_fn(move |mut exchange: Exchange| {
835 let bean = Arc::clone(&bean_clone);
836 let method = method.clone();
837
838 async move {
839 bean.call(&method, &mut exchange).await?;
840 Ok(exchange)
841 }
842 });
843
844 processors.push((BoxProcessor::new(processor), None));
845 }
846 BuilderStep::Script { language, script } => {
847 let lang = self.resolve_language(&language)?;
848 match lang.create_mutating_expression(&script) {
849 Ok(mut_expr) => {
850 processors
851 .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
852 }
853 Err(LanguageError::NotSupported {
854 feature,
855 language: ref lang_name,
856 }) => {
857 return Err(CamelError::RouteError(format!(
860 "Language '{}' does not support {} (required for .script() step)",
861 lang_name, feature
862 )));
863 }
864 Err(e) => {
865 return Err(CamelError::RouteError(format!(
866 "Failed to create mutating expression for language '{}': {}",
867 language, e
868 )));
869 }
870 }
871 }
872 BuilderStep::Throttle { config, steps } => {
873 let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
874 let sub_processors: Vec<BoxProcessor> =
875 sub_pairs.into_iter().map(|(p, _)| p).collect();
876 let sub_pipeline = compose_pipeline(sub_processors);
877 let svc =
878 camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
879 processors.push((BoxProcessor::new(svc), None));
880 }
881 BuilderStep::LoadBalance { config, steps } => {
882 let mut endpoints = Vec::new();
884 for step in steps {
885 let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
886 let sub_processors: Vec<BoxProcessor> =
887 sub_pairs.into_iter().map(|(p, _)| p).collect();
888 let endpoint = compose_pipeline(sub_processors);
889 endpoints.push(endpoint);
890 }
891 let svc =
892 camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
893 processors.push((BoxProcessor::new(svc), None));
894 }
895 BuilderStep::DynamicRouter { config } => {
896 use camel_api::EndpointResolver;
897
898 let producer_ctx_clone = producer_ctx.clone();
899 let registry_clone = Arc::clone(registry);
900 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
901 let parsed = match parse_uri(uri) {
902 Ok(p) => p,
903 Err(_) => return None,
904 };
905 let registry_guard = match registry_clone.lock() {
906 Ok(g) => g,
907 Err(_) => return None, };
909 let component = match registry_guard.get_or_err(&parsed.scheme) {
910 Ok(c) => c,
911 Err(_) => return None,
912 };
913 let endpoint = match component.create_endpoint(uri) {
914 Ok(e) => e,
915 Err(_) => return None,
916 };
917 let producer = match endpoint.create_producer(&producer_ctx_clone) {
918 Ok(p) => p,
919 Err(_) => return None,
920 };
921 Some(BoxProcessor::new(producer))
922 });
923 let svc = camel_processor::dynamic_router::DynamicRouterService::new(
924 config, resolver,
925 );
926 processors.push((BoxProcessor::new(svc), None));
927 }
928 BuilderStep::RoutingSlip { config } => {
929 use camel_api::EndpointResolver;
930
931 let producer_ctx_clone = producer_ctx.clone();
932 let registry_clone = registry.clone();
933 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
934 let parsed = match parse_uri(uri) {
935 Ok(p) => p,
936 Err(_) => return None,
937 };
938 let registry_guard = match registry_clone.lock() {
939 Ok(g) => g,
940 Err(_) => return None,
941 };
942 let component = match registry_guard.get_or_err(&parsed.scheme) {
943 Ok(c) => c,
944 Err(_) => return None,
945 };
946 let endpoint = match component.create_endpoint(uri) {
947 Ok(e) => e,
948 Err(_) => return None,
949 };
950 let producer = match endpoint.create_producer(&producer_ctx_clone) {
951 Ok(p) => p,
952 Err(_) => return None,
953 };
954 Some(BoxProcessor::new(producer))
955 });
956
957 let svc =
958 camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
959 processors.push((BoxProcessor::new(svc), None));
960 }
961 }
962 }
963 Ok(processors)
964 }
965
966 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
976 let route_id = definition.route_id().to_string();
977
978 if self.routes.contains_key(&route_id) {
979 return Err(CamelError::RouteError(format!(
980 "Route '{}' already exists",
981 route_id
982 )));
983 }
984
985 info!(route_id = %route_id, "Adding route to controller");
986
987 let definition_info = definition.to_info();
989 let RouteDefinition {
990 from_uri,
991 steps,
992 error_handler,
993 circuit_breaker,
994 unit_of_work,
995 concurrency,
996 ..
997 } = definition;
998
999 let producer_ctx = self.build_producer_context()?;
1001
1002 let mut aggregate_split: Option<AggregateSplitInfo> = None;
1004 let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
1005 Some((idx, agg_config)) => {
1006 let mut pre_steps = steps;
1007 let mut rest = pre_steps.split_off(idx);
1008 let _agg_step = rest.remove(0);
1009 let post_steps = rest;
1010
1011 let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
1012 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
1013 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
1014 compose_pipeline(pre_procs),
1015 )));
1016
1017 let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
1018 let post_procs: Vec<BoxProcessor> =
1019 post_pairs.into_iter().map(|(p, _)| p).collect();
1020 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
1021 compose_pipeline(post_procs),
1022 )));
1023
1024 aggregate_split = Some(AggregateSplitInfo {
1025 pre_pipeline,
1026 agg_config,
1027 post_pipeline,
1028 });
1029
1030 vec![]
1031 }
1032 None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
1033 };
1034 let route_id_for_tracing = route_id.clone();
1035 let mut pipeline = if processors_with_contracts.is_empty() {
1036 BoxProcessor::new(IdentityProcessor)
1037 } else {
1038 compose_traced_pipeline_with_contracts(
1039 processors_with_contracts,
1040 &route_id_for_tracing,
1041 self.tracing_enabled,
1042 self.tracer_detail_level.clone(),
1043 self.tracer_metrics.clone(),
1044 )
1045 };
1046
1047 if let Some(cb_config) = circuit_breaker {
1049 let cb_layer = CircuitBreakerLayer::new(cb_config);
1050 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1051 }
1052
1053 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
1055
1056 if let Some(config) = eh_config {
1057 let registry = self
1059 .registry
1060 .lock()
1061 .expect("mutex poisoned: another thread panicked while holding this lock");
1062 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
1063 pipeline = BoxProcessor::new(layer.layer(pipeline));
1064 }
1065
1066 let uow_counter = if let Some(uow_config) = &unit_of_work {
1068 let registry = self
1069 .registry
1070 .lock()
1071 .expect("mutex poisoned: registry lock in add_route uow");
1072 let (uow_layer, counter) =
1073 self.resolve_uow_layer(uow_config, &producer_ctx, ®istry, None)?;
1074 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1075 Some(counter)
1076 } else {
1077 None
1078 };
1079
1080 self.routes.insert(
1081 route_id.clone(),
1082 ManagedRoute {
1083 definition: definition_info,
1084 from_uri,
1085 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
1086 concurrency,
1087 consumer_handle: None,
1088 pipeline_handle: None,
1089 consumer_cancel_token: CancellationToken::new(),
1090 pipeline_cancel_token: CancellationToken::new(),
1091 channel_sender: None,
1092 in_flight: uow_counter,
1093 aggregate_split,
1094 agg_service: None,
1095 },
1096 );
1097
1098 Ok(())
1099 }
1100
1101 pub fn compile_route_definition(
1106 &self,
1107 def: RouteDefinition,
1108 ) -> Result<BoxProcessor, CamelError> {
1109 let route_id = def.route_id().to_string();
1110
1111 let producer_ctx = self.build_producer_context()?;
1112
1113 let processors_with_contracts =
1114 self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
1115 let mut pipeline = compose_traced_pipeline_with_contracts(
1116 processors_with_contracts,
1117 &route_id,
1118 self.tracing_enabled,
1119 self.tracer_detail_level.clone(),
1120 self.tracer_metrics.clone(),
1121 );
1122
1123 if let Some(cb_config) = def.circuit_breaker {
1124 let cb_layer = CircuitBreakerLayer::new(cb_config);
1125 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1126 }
1127
1128 let eh_config = def
1129 .error_handler
1130 .clone()
1131 .or_else(|| self.global_error_handler.clone());
1132 if let Some(config) = eh_config {
1133 let registry = self
1135 .registry
1136 .lock()
1137 .expect("mutex poisoned: registry lock in compile_route_definition");
1138 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
1139 pipeline = BoxProcessor::new(layer.layer(pipeline));
1140 }
1141
1142 if let Some(uow_config) = &def.unit_of_work {
1144 let existing_counter = self
1145 .routes
1146 .get(&route_id)
1147 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1148
1149 let registry = self
1150 .registry
1151 .lock()
1152 .expect("mutex poisoned: registry lock in compile_route_definition uow");
1153
1154 let (uow_layer, _counter) =
1155 self.resolve_uow_layer(uow_config, &producer_ctx, ®istry, existing_counter)?;
1156
1157 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1158 }
1159
1160 Ok(pipeline)
1161 }
1162
1163 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1169 let managed = self.routes.get(route_id).ok_or_else(|| {
1170 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1171 })?;
1172 if handle_is_running(&managed.consumer_handle)
1173 || handle_is_running(&managed.pipeline_handle)
1174 {
1175 return Err(CamelError::RouteError(format!(
1176 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1177 route_id,
1178 inferred_lifecycle_label(managed)
1179 )));
1180 }
1181 self.routes.remove(route_id);
1182 info!(route_id = %route_id, "Route removed from controller");
1183 Ok(())
1184 }
1185
1186 pub fn route_count(&self) -> usize {
1188 self.routes.len()
1189 }
1190
1191 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1192 self.routes.get(route_id).map(|r| {
1193 r.in_flight
1194 .as_ref()
1195 .map_or(0, |c| c.load(Ordering::Relaxed))
1196 })
1197 }
1198
1199 pub fn route_exists(&self, route_id: &str) -> bool {
1201 self.routes.contains_key(route_id)
1202 }
1203
1204 pub fn route_ids(&self) -> Vec<String> {
1206 self.routes.keys().cloned().collect()
1207 }
1208
1209 pub fn auto_startup_route_ids(&self) -> Vec<String> {
1211 let mut pairs: Vec<(String, i32)> = self
1212 .routes
1213 .iter()
1214 .filter(|(_, managed)| managed.definition.auto_startup())
1215 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1216 .collect();
1217 pairs.sort_by_key(|(_, order)| *order);
1218 pairs.into_iter().map(|(id, _)| id).collect()
1219 }
1220
1221 pub fn shutdown_route_ids(&self) -> Vec<String> {
1223 let mut pairs: Vec<(String, i32)> = self
1224 .routes
1225 .iter()
1226 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1227 .collect();
1228 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1229 pairs.into_iter().map(|(id, _)| id).collect()
1230 }
1231
1232 pub fn swap_pipeline(
1237 &self,
1238 route_id: &str,
1239 new_pipeline: BoxProcessor,
1240 ) -> Result<(), CamelError> {
1241 let managed = self
1242 .routes
1243 .get(route_id)
1244 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1245
1246 if managed.aggregate_split.is_some() {
1247 tracing::warn!(
1248 route_id = %route_id,
1249 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1250 );
1251 }
1252
1253 managed
1254 .pipeline
1255 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1256 info!(route_id = %route_id, "Pipeline swapped atomically");
1257 Ok(())
1258 }
1259
1260 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1262 self.routes.get(route_id).map(|r| r.from_uri.clone())
1263 }
1264
1265 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1270 self.routes
1271 .get(route_id)
1272 .map(|r| r.pipeline.load().0.clone())
1273 }
1274
1275 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1277 let managed = self
1278 .routes
1279 .get_mut(route_id)
1280 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1281
1282 if !handle_is_running(&managed.consumer_handle)
1283 && !handle_is_running(&managed.pipeline_handle)
1284 {
1285 return Ok(());
1286 }
1287
1288 info!(route_id = %route_id, "Stopping route");
1289
1290 let managed = self
1292 .routes
1293 .get_mut(route_id)
1294 .expect("invariant: route must exist after prior existence check");
1295 managed.consumer_cancel_token.cancel();
1296
1297 let managed = self
1299 .routes
1300 .get_mut(route_id)
1301 .expect("invariant: route must exist after prior existence check");
1302 if let Some(agg_svc) = &managed.agg_service {
1303 let guard = agg_svc.lock().unwrap();
1304 guard.force_complete_all();
1305 }
1306
1307 let managed = self
1308 .routes
1309 .get_mut(route_id)
1310 .expect("invariant: route must exist after prior existence check");
1311 managed.pipeline_cancel_token.cancel();
1312
1313 let managed = self
1315 .routes
1316 .get_mut(route_id)
1317 .expect("invariant: route must exist after prior existence check");
1318 let consumer_handle = managed.consumer_handle.take();
1319 let pipeline_handle = managed.pipeline_handle.take();
1320
1321 let managed = self
1324 .routes
1325 .get_mut(route_id)
1326 .expect("invariant: route must exist after prior existence check");
1327 managed.channel_sender = None;
1328
1329 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1333 match (consumer_handle, pipeline_handle) {
1334 (Some(c), Some(p)) => {
1335 let _ = tokio::join!(c, p);
1336 }
1337 (Some(c), None) => {
1338 let _ = c.await;
1339 }
1340 (None, Some(p)) => {
1341 let _ = p.await;
1342 }
1343 (None, None) => {}
1344 }
1345 })
1346 .await;
1347
1348 if timeout_result.is_err() {
1349 warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
1350 }
1351
1352 let managed = self
1354 .routes
1355 .get_mut(route_id)
1356 .expect("invariant: route must exist after prior existence check");
1357
1358 managed.consumer_cancel_token = CancellationToken::new();
1360 managed.pipeline_cancel_token = CancellationToken::new();
1361
1362 info!(route_id = %route_id, "Route stopped");
1363 Ok(())
1364 }
1365}
1366
1367#[async_trait::async_trait]
1368impl RouteController for DefaultRouteController {
1369 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1370 {
1372 let managed = self
1373 .routes
1374 .get_mut(route_id)
1375 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1376
1377 let consumer_running = handle_is_running(&managed.consumer_handle);
1378 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1379 if consumer_running && pipeline_running {
1380 return Ok(());
1381 }
1382 if !consumer_running && pipeline_running {
1383 return Err(CamelError::RouteError(format!(
1384 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1385 route_id
1386 )));
1387 }
1388 if consumer_running && !pipeline_running {
1389 return Err(CamelError::RouteError(format!(
1390 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1391 route_id
1392 )));
1393 }
1394 }
1395
1396 info!(route_id = %route_id, "Starting route");
1397
1398 let (from_uri, pipeline, concurrency) = {
1400 let managed = self
1401 .routes
1402 .get(route_id)
1403 .expect("invariant: route must exist after prior existence check");
1404 (
1405 managed.from_uri.clone(),
1406 Arc::clone(&managed.pipeline),
1407 managed.concurrency.clone(),
1408 )
1409 };
1410
1411 let crash_notifier = self.crash_notifier.clone();
1413 let runtime_for_consumer = self.runtime.clone();
1414
1415 let parsed = parse_uri(&from_uri)?;
1417 let registry = self
1418 .registry
1419 .lock()
1420 .expect("mutex poisoned: another thread panicked while holding this lock");
1421 let component = registry.get_or_err(&parsed.scheme)?;
1422 let endpoint = component.create_endpoint(&from_uri)?;
1423 let mut consumer = endpoint.create_consumer()?;
1424 let consumer_concurrency = consumer.concurrency_model();
1425 drop(registry);
1427
1428 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1430
1431 let managed = self
1433 .routes
1434 .get_mut(route_id)
1435 .expect("invariant: route must exist after prior existence check");
1436
1437 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1439 let consumer_cancel = managed.consumer_cancel_token.child_token();
1441 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1442 let tx_for_storage = tx.clone();
1444 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1445
1446 let managed = self
1448 .routes
1449 .get_mut(route_id)
1450 .expect("invariant: route must exist after prior existence check");
1451
1452 if let Some(split) = managed.aggregate_split.as_ref() {
1453 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1454
1455 let route_cancel_clone = pipeline_cancel.clone();
1456 let svc = AggregatorService::new(
1457 split.agg_config.clone(),
1458 late_tx,
1459 Arc::clone(&self.languages),
1460 route_cancel_clone,
1461 );
1462 let agg = Arc::new(std::sync::Mutex::new(svc));
1463
1464 managed.agg_service = Some(Arc::clone(&agg));
1465
1466 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1467 let pre_pipeline = Arc::clone(&split.pre_pipeline);
1468 let post_pipeline = Arc::clone(&split.post_pipeline);
1469
1470 let route_id_for_consumer = route_id.to_string();
1472 let consumer_handle = tokio::spawn(async move {
1473 if let Err(e) = consumer.start(consumer_ctx).await {
1474 error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1475 let error_msg = e.to_string();
1476 if let Some(tx) = crash_notifier {
1477 let _ = tx
1478 .send(CrashNotification {
1479 route_id: route_id_for_consumer.clone(),
1480 error: error_msg.clone(),
1481 })
1482 .await;
1483 }
1484 publish_runtime_failure(
1485 runtime_for_consumer,
1486 &route_id_for_consumer,
1487 &error_msg,
1488 )
1489 .await;
1490 }
1491 });
1492
1493 let pipeline_handle = tokio::spawn(async move {
1495 loop {
1496 tokio::select! {
1497 biased;
1498
1499 late_ex = async {
1500 let mut rx = late_rx.lock().await;
1501 rx.recv().await
1502 } => {
1503 match late_ex {
1504 Some(ex) => {
1505 let pipe = post_pipeline.load();
1506 if let Err(e) = pipe.0.clone().oneshot(ex).await {
1507 tracing::warn!(error = %e, "late exchange post-pipeline failed");
1508 }
1509 }
1510 None => return,
1511 }
1512 }
1513
1514 envelope_opt = rx.recv() => {
1515 match envelope_opt {
1516 Some(envelope) => {
1517 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1518 let pre_pipe = pre_pipeline.load();
1519 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1520 Ok(ex) => ex,
1521 Err(e) => {
1522 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1523 continue;
1524 }
1525 };
1526
1527 let ex = {
1528 let cloned_svc = agg.lock().unwrap().clone();
1529 cloned_svc.oneshot(ex).await
1530 };
1531
1532 match ex {
1533 Ok(ex) => {
1534 if !is_pending(&ex) {
1535 let post_pipe = post_pipeline.load();
1536 let out = post_pipe.0.clone().oneshot(ex).await;
1537 if let Some(tx) = reply_tx { let _ = tx.send(out); }
1538 } else if let Some(tx) = reply_tx {
1539 let _ = tx.send(Ok(ex));
1540 }
1541 }
1542 Err(e) => {
1543 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1544 }
1545 }
1546 }
1547 None => return,
1548 }
1549 }
1550
1551 _ = pipeline_cancel.cancelled() => {
1552 {
1553 let guard = agg.lock().unwrap();
1554 guard.force_complete_all();
1555 }
1556 let mut rx_guard = late_rx.lock().await;
1557 while let Ok(late_ex) = rx_guard.try_recv() {
1558 let pipe = post_pipeline.load();
1559 let _ = pipe.0.clone().oneshot(late_ex).await;
1560 }
1561 break;
1562 }
1563 }
1564 }
1565 });
1566
1567 let managed = self
1568 .routes
1569 .get_mut(route_id)
1570 .expect("invariant: route must exist");
1571 managed.consumer_handle = Some(consumer_handle);
1572 managed.pipeline_handle = Some(pipeline_handle);
1573 managed.channel_sender = Some(tx_for_storage);
1574
1575 info!(route_id = %route_id, "Route started (aggregate with timeout)");
1576 return Ok(());
1577 }
1578 let route_id_for_consumer = route_id.to_string();
1582 let consumer_handle = tokio::spawn(async move {
1583 if let Err(e) = consumer.start(consumer_ctx).await {
1584 error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1585 let error_msg = e.to_string();
1586
1587 if let Some(tx) = crash_notifier {
1589 let _ = tx
1590 .send(CrashNotification {
1591 route_id: route_id_for_consumer.clone(),
1592 error: error_msg.clone(),
1593 })
1594 .await;
1595 }
1596
1597 publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1598 .await;
1599 }
1600 });
1601
1602 let pipeline_handle = match effective_concurrency {
1604 ConcurrencyModel::Sequential => {
1605 tokio::spawn(async move {
1606 loop {
1607 let envelope = tokio::select! {
1609 envelope = rx.recv() => match envelope {
1610 Some(e) => e,
1611 None => return, },
1613 _ = pipeline_cancel.cancelled() => {
1614 return;
1616 }
1617 };
1618 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1619
1620 let mut pipeline = pipeline.load().0.clone();
1622
1623 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1624 if let Some(tx) = reply_tx {
1625 let _ = tx.send(Err(e));
1626 }
1627 return;
1628 }
1629
1630 let result = pipeline.call(exchange).await;
1631 if let Some(tx) = reply_tx {
1632 let _ = tx.send(result);
1633 } else if let Err(ref e) = result
1634 && !matches!(e, CamelError::Stopped)
1635 {
1636 error!("Pipeline error: {e}");
1637 }
1638 }
1639 })
1640 }
1641 ConcurrencyModel::Concurrent { max } => {
1642 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1643 tokio::spawn(async move {
1644 loop {
1645 let envelope = tokio::select! {
1647 envelope = rx.recv() => match envelope {
1648 Some(e) => e,
1649 None => return, },
1651 _ = pipeline_cancel.cancelled() => {
1652 return;
1654 }
1655 };
1656 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1657 let pipe_ref = Arc::clone(&pipeline);
1658 let sem = sem.clone();
1659 let cancel = pipeline_cancel.clone();
1660 tokio::spawn(async move {
1661 let _permit = match &sem {
1663 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1664 None => None,
1665 };
1666
1667 let mut pipe = pipe_ref.load().0.clone();
1669
1670 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1672 if let Some(tx) = reply_tx {
1673 let _ = tx.send(Err(e));
1674 }
1675 return;
1676 }
1677
1678 let result = pipe.call(exchange).await;
1679 if let Some(tx) = reply_tx {
1680 let _ = tx.send(result);
1681 } else if let Err(ref e) = result
1682 && !matches!(e, CamelError::Stopped)
1683 {
1684 error!("Pipeline error: {e}");
1685 }
1686 });
1687 }
1688 })
1689 }
1690 };
1691
1692 let managed = self
1694 .routes
1695 .get_mut(route_id)
1696 .expect("invariant: route must exist after prior existence check");
1697 managed.consumer_handle = Some(consumer_handle);
1698 managed.pipeline_handle = Some(pipeline_handle);
1699 managed.channel_sender = Some(tx_for_storage);
1700
1701 info!(route_id = %route_id, "Route started");
1702 Ok(())
1703 }
1704
1705 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1706 self.stop_route_internal(route_id).await
1707 }
1708
1709 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1710 self.stop_route(route_id).await?;
1711 tokio::time::sleep(Duration::from_millis(100)).await;
1712 self.start_route(route_id).await
1713 }
1714
1715 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1716 let managed = self
1718 .routes
1719 .get_mut(route_id)
1720 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1721
1722 let consumer_running = handle_is_running(&managed.consumer_handle);
1723 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1724
1725 if !consumer_running || !pipeline_running {
1727 return Err(CamelError::RouteError(format!(
1728 "Cannot suspend route '{}' with execution lifecycle {}",
1729 route_id,
1730 inferred_lifecycle_label(managed)
1731 )));
1732 }
1733
1734 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1735
1736 let managed = self
1738 .routes
1739 .get_mut(route_id)
1740 .expect("invariant: route must exist after prior existence check");
1741 managed.consumer_cancel_token.cancel();
1742
1743 let managed = self
1745 .routes
1746 .get_mut(route_id)
1747 .expect("invariant: route must exist after prior existence check");
1748 let consumer_handle = managed.consumer_handle.take();
1749
1750 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1752 if let Some(handle) = consumer_handle {
1753 let _ = handle.await;
1754 }
1755 })
1756 .await;
1757
1758 if timeout_result.is_err() {
1759 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1760 }
1761
1762 let managed = self
1764 .routes
1765 .get_mut(route_id)
1766 .expect("invariant: route must exist after prior existence check");
1767
1768 managed.consumer_cancel_token = CancellationToken::new();
1770
1771 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1772 Ok(())
1773 }
1774
1775 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1776 let managed = self
1778 .routes
1779 .get(route_id)
1780 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1781
1782 let consumer_running = handle_is_running(&managed.consumer_handle);
1783 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1784 if consumer_running || !pipeline_running {
1785 return Err(CamelError::RouteError(format!(
1786 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1787 route_id,
1788 inferred_lifecycle_label(managed)
1789 )));
1790 }
1791
1792 let sender = managed.channel_sender.clone().ok_or_else(|| {
1794 CamelError::RouteError("Suspended route has no channel sender".into())
1795 })?;
1796
1797 let from_uri = managed.from_uri.clone();
1799
1800 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1801
1802 let parsed = parse_uri(&from_uri)?;
1804 let registry = self
1805 .registry
1806 .lock()
1807 .expect("mutex poisoned: another thread panicked while holding this lock");
1808 let component = registry.get_or_err(&parsed.scheme)?;
1809 let endpoint = component.create_endpoint(&from_uri)?;
1810 let mut consumer = endpoint.create_consumer()?;
1811 drop(registry);
1813
1814 let managed = self
1816 .routes
1817 .get_mut(route_id)
1818 .expect("invariant: route must exist after prior existence check");
1819
1820 let consumer_cancel = managed.consumer_cancel_token.child_token();
1822
1823 let crash_notifier = self.crash_notifier.clone();
1824 let runtime_for_consumer = self.runtime.clone();
1825
1826 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1828
1829 let route_id_for_consumer = route_id.to_string();
1831 let consumer_handle = tokio::spawn(async move {
1832 if let Err(e) = consumer.start(consumer_ctx).await {
1833 error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1834 let error_msg = e.to_string();
1835
1836 if let Some(tx) = crash_notifier {
1838 let _ = tx
1839 .send(CrashNotification {
1840 route_id: route_id_for_consumer.clone(),
1841 error: error_msg.clone(),
1842 })
1843 .await;
1844 }
1845
1846 publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1847 .await;
1848 }
1849 });
1850
1851 let managed = self
1853 .routes
1854 .get_mut(route_id)
1855 .expect("invariant: route must exist after prior existence check");
1856 managed.consumer_handle = Some(consumer_handle);
1857
1858 info!(route_id = %route_id, "Route resumed");
1859 Ok(())
1860 }
1861
1862 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1863 let route_ids: Vec<String> = {
1866 let mut pairs: Vec<_> = self
1867 .routes
1868 .iter()
1869 .filter(|(_, r)| r.definition.auto_startup())
1870 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1871 .collect();
1872 pairs.sort_by_key(|(_, order)| *order);
1873 pairs.into_iter().map(|(id, _)| id).collect()
1874 };
1875
1876 info!("Starting {} auto-startup routes", route_ids.len());
1877
1878 let mut errors: Vec<String> = Vec::new();
1880 for route_id in route_ids {
1881 if let Err(e) = self.start_route(&route_id).await {
1882 errors.push(format!("Route '{}': {}", route_id, e));
1883 }
1884 }
1885
1886 if !errors.is_empty() {
1887 return Err(CamelError::RouteError(format!(
1888 "Failed to start routes: {}",
1889 errors.join(", ")
1890 )));
1891 }
1892
1893 info!("All auto-startup routes started");
1894 Ok(())
1895 }
1896
1897 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1898 let route_ids: Vec<String> = {
1900 let mut pairs: Vec<_> = self
1901 .routes
1902 .iter()
1903 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1904 .collect();
1905 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1906 pairs.into_iter().map(|(id, _)| id).collect()
1907 };
1908
1909 info!("Stopping {} routes", route_ids.len());
1910
1911 for route_id in route_ids {
1912 let _ = self.stop_route(&route_id).await;
1913 }
1914
1915 info!("All routes stopped");
1916 Ok(())
1917 }
1918}
1919
1920#[async_trait::async_trait]
1921impl RouteControllerInternal for DefaultRouteController {
1922 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
1923 DefaultRouteController::add_route(self, def)
1924 }
1925
1926 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
1927 DefaultRouteController::swap_pipeline(self, route_id, pipeline)
1928 }
1929
1930 fn route_from_uri(&self, route_id: &str) -> Option<String> {
1931 DefaultRouteController::route_from_uri(self, route_id)
1933 }
1934
1935 fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
1936 DefaultRouteController::set_error_handler(self, config)
1937 }
1938
1939 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
1940 DefaultRouteController::set_self_ref(self, self_ref)
1941 }
1942
1943 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
1944 DefaultRouteController::set_runtime_handle(self, runtime)
1945 }
1946
1947 fn route_count(&self) -> usize {
1948 DefaultRouteController::route_count(self)
1949 }
1950
1951 fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1952 DefaultRouteController::in_flight_count(self, route_id)
1953 }
1954
1955 fn route_exists(&self, route_id: &str) -> bool {
1956 DefaultRouteController::route_exists(self, route_id)
1957 }
1958
1959 fn route_ids(&self) -> Vec<String> {
1960 DefaultRouteController::route_ids(self)
1961 }
1962
1963 fn auto_startup_route_ids(&self) -> Vec<String> {
1964 DefaultRouteController::auto_startup_route_ids(self)
1965 }
1966
1967 fn shutdown_route_ids(&self) -> Vec<String> {
1968 DefaultRouteController::shutdown_route_ids(self)
1969 }
1970
1971 fn set_tracer_config(&mut self, config: &TracerConfig) {
1972 DefaultRouteController::set_tracer_config(self, config)
1973 }
1974
1975 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
1976 DefaultRouteController::compile_route_definition(self, def)
1977 }
1978
1979 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1980 DefaultRouteController::remove_route(self, route_id)
1981 }
1982
1983 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1984 DefaultRouteController::start_route(self, route_id).await
1985 }
1986
1987 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1988 DefaultRouteController::stop_route(self, route_id).await
1989 }
1990}
1991
1992#[cfg(test)]
1993mod tests {
1994 use super::*;
1995 use crate::shared::components::domain::Registry;
1996
1997 fn build_controller() -> DefaultRouteController {
1998 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())))
1999 }
2000
2001 fn build_controller_with_components() -> DefaultRouteController {
2002 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2003 {
2004 let mut guard = registry.lock().expect("registry lock");
2005 guard.register(camel_component_timer::TimerComponent::new());
2006 guard.register(camel_component_mock::MockComponent::new());
2007 guard.register(camel_component_log::LogComponent::new());
2008 }
2009 DefaultRouteController::new(registry)
2010 }
2011
2012 fn set_self_ref(controller: &mut DefaultRouteController) {
2013 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2014 let other: Arc<Mutex<dyn RouteController>> =
2015 Arc::new(Mutex::new(DefaultRouteController::new(registry)));
2016 controller.set_self_ref(other);
2017 }
2018
2019 fn register_simple_language(controller: &mut DefaultRouteController) {
2020 controller.languages.lock().expect("languages lock").insert(
2021 "simple".into(),
2022 Arc::new(camel_language_simple::SimpleLanguage),
2023 );
2024 }
2025
2026 #[test]
2027 fn test_route_controller_internal_is_object_safe() {
2028 let _: Option<Box<dyn RouteControllerInternal>> = None;
2029 }
2030
2031 #[test]
2032 fn helper_functions_cover_non_async_branches() {
2033 let managed = ManagedRoute {
2034 definition: RouteDefinition::new("timer:a", vec![])
2035 .with_route_id("r")
2036 .to_info(),
2037 from_uri: "timer:a".into(),
2038 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(BoxProcessor::new(
2039 IdentityProcessor,
2040 )))),
2041 concurrency: None,
2042 consumer_handle: None,
2043 pipeline_handle: None,
2044 consumer_cancel_token: CancellationToken::new(),
2045 pipeline_cancel_token: CancellationToken::new(),
2046 channel_sender: None,
2047 in_flight: None,
2048 aggregate_split: None,
2049 agg_service: None,
2050 };
2051
2052 assert_eq!(inferred_lifecycle_label(&managed), "Stopped");
2053 assert!(!handle_is_running(&managed.consumer_handle));
2054
2055 let cmd = runtime_failure_command("route-x", "boom");
2056 match cmd {
2057 RuntimeCommand::FailRoute {
2058 route_id, error, ..
2059 } => {
2060 assert_eq!(route_id, "route-x");
2061 assert_eq!(error, "boom");
2062 }
2063 _ => panic!("expected FailRoute command"),
2064 }
2065 }
2066
2067 #[test]
2068 fn add_route_detects_duplicates() {
2069 let mut controller = build_controller();
2070 set_self_ref(&mut controller);
2071
2072 controller
2073 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
2074 .expect("add route");
2075
2076 let dup_err = controller
2077 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
2078 .expect_err("duplicate must fail");
2079 assert!(dup_err.to_string().contains("already exists"));
2080 }
2081
2082 #[test]
2083 fn route_introspection_and_ordering_helpers_work() {
2084 let mut controller = build_controller();
2085 set_self_ref(&mut controller);
2086
2087 controller
2088 .add_route(
2089 RouteDefinition::new("timer:a", vec![])
2090 .with_route_id("a")
2091 .with_startup_order(20),
2092 )
2093 .unwrap();
2094 controller
2095 .add_route(
2096 RouteDefinition::new("timer:b", vec![])
2097 .with_route_id("b")
2098 .with_startup_order(10),
2099 )
2100 .unwrap();
2101 controller
2102 .add_route(
2103 RouteDefinition::new("timer:c", vec![])
2104 .with_route_id("c")
2105 .with_auto_startup(false)
2106 .with_startup_order(5),
2107 )
2108 .unwrap();
2109
2110 assert_eq!(controller.route_count(), 3);
2111 assert_eq!(controller.route_from_uri("a"), Some("timer:a".into()));
2112 assert!(controller.route_ids().contains(&"a".to_string()));
2113 assert_eq!(
2114 controller.auto_startup_route_ids(),
2115 vec!["b".to_string(), "a".to_string()]
2116 );
2117 assert_eq!(
2118 controller.shutdown_route_ids(),
2119 vec!["a".to_string(), "b".to_string(), "c".to_string()]
2120 );
2121 }
2122
2123 #[test]
2124 fn swap_pipeline_and_remove_route_behaviors() {
2125 let mut controller = build_controller();
2126 set_self_ref(&mut controller);
2127
2128 controller
2129 .add_route(RouteDefinition::new("timer:a", vec![]).with_route_id("swap"))
2130 .unwrap();
2131
2132 controller
2133 .swap_pipeline("swap", BoxProcessor::new(IdentityProcessor))
2134 .unwrap();
2135 assert!(controller.get_pipeline("swap").is_some());
2136
2137 controller.remove_route("swap").unwrap();
2138 assert_eq!(controller.route_count(), 0);
2139
2140 let err = controller
2141 .remove_route("swap")
2142 .expect_err("missing route must fail");
2143 assert!(err.to_string().contains("not found"));
2144 }
2145
2146 #[test]
2147 fn resolve_steps_covers_declarative_and_eip_variants() {
2148 use camel_api::LanguageExpressionDef;
2149 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2150
2151 let mut controller = build_controller_with_components();
2152 set_self_ref(&mut controller);
2153 register_simple_language(&mut controller);
2154
2155 let expr = |source: &str| LanguageExpressionDef {
2156 language: "simple".into(),
2157 source: source.into(),
2158 };
2159
2160 let steps = vec![
2161 BuilderStep::To("mock:out".into()),
2162 BuilderStep::Stop,
2163 BuilderStep::Log {
2164 level: camel_processor::LogLevel::Info,
2165 message: "log".into(),
2166 },
2167 BuilderStep::DeclarativeSetHeader {
2168 key: "k".into(),
2169 value: ValueSourceDef::Literal(Value::String("v".into())),
2170 },
2171 BuilderStep::DeclarativeSetHeader {
2172 key: "k2".into(),
2173 value: ValueSourceDef::Expression(expr("${body}")),
2174 },
2175 BuilderStep::DeclarativeSetBody {
2176 value: ValueSourceDef::Expression(expr("${body}")),
2177 },
2178 BuilderStep::DeclarativeFilter {
2179 predicate: expr("${body} != null"),
2180 steps: vec![BuilderStep::Stop],
2181 },
2182 BuilderStep::DeclarativeChoice {
2183 whens: vec![
2184 crate::lifecycle::application::route_definition::DeclarativeWhenStep {
2185 predicate: expr("${body} == 'x'"),
2186 steps: vec![BuilderStep::Stop],
2187 },
2188 ],
2189 otherwise: Some(vec![BuilderStep::Stop]),
2190 },
2191 BuilderStep::DeclarativeScript {
2192 expression: expr("${body}"),
2193 },
2194 BuilderStep::Split {
2195 config: SplitterConfig::new(split_body_lines())
2196 .aggregation(AggregationStrategy::CollectAll),
2197 steps: vec![BuilderStep::Stop],
2198 },
2199 BuilderStep::DeclarativeSplit {
2200 expression: expr("${body}"),
2201 aggregation: AggregationStrategy::Original,
2202 parallel: false,
2203 parallel_limit: Some(2),
2204 stop_on_exception: true,
2205 steps: vec![BuilderStep::Stop],
2206 },
2207 BuilderStep::Aggregate {
2208 config: camel_api::AggregatorConfig::correlate_by("id")
2209 .complete_when_size(1)
2210 .build(),
2211 },
2212 BuilderStep::Filter {
2213 predicate: Arc::new(|_| true),
2214 steps: vec![BuilderStep::Stop],
2215 },
2216 BuilderStep::Choice {
2217 whens: vec![crate::lifecycle::application::route_definition::WhenStep {
2218 predicate: Arc::new(|_| true),
2219 steps: vec![BuilderStep::Stop],
2220 }],
2221 otherwise: Some(vec![BuilderStep::Stop]),
2222 },
2223 BuilderStep::WireTap {
2224 uri: "mock:tap".into(),
2225 },
2226 BuilderStep::Multicast {
2227 steps: vec![
2228 BuilderStep::To("mock:m1".into()),
2229 BuilderStep::To("mock:m2".into()),
2230 ],
2231 config: camel_api::MulticastConfig::new(),
2232 },
2233 BuilderStep::DeclarativeLog {
2234 level: camel_processor::LogLevel::Info,
2235 message: ValueSourceDef::Expression(expr("${body}")),
2236 },
2237 BuilderStep::Throttle {
2238 config: camel_api::ThrottlerConfig::new(10, Duration::from_millis(100)),
2239 steps: vec![BuilderStep::To("mock:t".into())],
2240 },
2241 BuilderStep::LoadBalance {
2242 config: camel_api::LoadBalancerConfig::round_robin(),
2243 steps: vec![
2244 BuilderStep::To("mock:l1".into()),
2245 BuilderStep::To("mock:l2".into()),
2246 ],
2247 },
2248 BuilderStep::DynamicRouter {
2249 config: camel_api::DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
2250 },
2251 BuilderStep::RoutingSlip {
2252 config: camel_api::RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
2253 },
2254 ];
2255
2256 let producer_ctx = ProducerContext::new();
2257 let resolved = controller
2258 .resolve_steps(steps, &producer_ctx, &controller.registry)
2259 .expect("resolve should succeed");
2260 assert!(!resolved.is_empty());
2261 }
2262
2263 #[test]
2264 fn resolve_steps_script_requires_mutating_language_support() {
2265 use camel_api::LanguageExpressionDef;
2266
2267 let mut controller = build_controller_with_components();
2268 set_self_ref(&mut controller);
2269 register_simple_language(&mut controller);
2270
2271 let steps = vec![BuilderStep::Script {
2272 language: "simple".into(),
2273 script: "${body}".into(),
2274 }];
2275
2276 let err = controller
2277 .resolve_steps(steps, &ProducerContext::new(), &controller.registry)
2278 .expect_err("simple script should fail for mutating expression");
2279 assert!(err.to_string().contains("does not support"));
2280
2281 let bean_missing = vec![BuilderStep::Bean {
2282 name: "unknown".into(),
2283 method: "run".into(),
2284 }];
2285 let bean_err = controller
2286 .resolve_steps(bean_missing, &ProducerContext::new(), &controller.registry)
2287 .expect_err("missing bean must fail");
2288 assert!(bean_err.to_string().contains("Bean not found"));
2289
2290 let bad_declarative = vec![BuilderStep::DeclarativeScript {
2291 expression: LanguageExpressionDef {
2292 language: "unknown".into(),
2293 source: "x".into(),
2294 },
2295 }];
2296 let lang_err = controller
2297 .resolve_steps(
2298 bad_declarative,
2299 &ProducerContext::new(),
2300 &controller.registry,
2301 )
2302 .expect_err("unknown language must fail");
2303 assert!(lang_err.to_string().contains("not registered"));
2304 }
2305
2306 #[tokio::test]
2307 async fn lifecycle_methods_report_missing_routes() {
2308 let mut controller = build_controller();
2309
2310 assert!(controller.start_route("missing").await.is_err());
2311 assert!(controller.stop_route("missing").await.is_err());
2312 assert!(controller.suspend_route("missing").await.is_err());
2313 assert!(controller.resume_route("missing").await.is_err());
2314 }
2315
2316 #[tokio::test]
2317 async fn start_stop_route_happy_path_with_timer_and_mock() {
2318 let mut controller = build_controller_with_components();
2319 set_self_ref(&mut controller);
2320
2321 let route = RouteDefinition::new(
2322 "timer:tick?period=10&repeatCount=1",
2323 vec![BuilderStep::To("mock:out".into())],
2324 )
2325 .with_route_id("rt-1");
2326 controller.add_route(route).unwrap();
2327
2328 controller.start_route("rt-1").await.unwrap();
2329 tokio::time::sleep(Duration::from_millis(40)).await;
2330 controller.stop_route("rt-1").await.unwrap();
2331
2332 controller.remove_route("rt-1").unwrap();
2333 }
2334
2335 #[tokio::test]
2336 async fn suspend_resume_and_restart_cover_execution_transitions() {
2337 let mut controller = build_controller_with_components();
2338 set_self_ref(&mut controller);
2339
2340 let route = RouteDefinition::new(
2341 "timer:tick?period=30",
2342 vec![BuilderStep::To("mock:out".into())],
2343 )
2344 .with_route_id("rt-2");
2345 controller.add_route(route).unwrap();
2346
2347 controller.start_route("rt-2").await.unwrap();
2348 controller.suspend_route("rt-2").await.unwrap();
2349 controller.resume_route("rt-2").await.unwrap();
2350 controller.restart_route("rt-2").await.unwrap();
2351 controller.stop_route("rt-2").await.unwrap();
2352 }
2353
2354 #[tokio::test]
2355 async fn remove_route_rejects_running_route() {
2356 let mut controller = build_controller_with_components();
2357 set_self_ref(&mut controller);
2358
2359 let route = RouteDefinition::new(
2360 "timer:tick?period=25",
2361 vec![BuilderStep::To("mock:out".into())],
2362 )
2363 .with_route_id("rt-running");
2364 controller.add_route(route).unwrap();
2365 controller.start_route("rt-running").await.unwrap();
2366
2367 let err = controller
2368 .remove_route("rt-running")
2369 .expect_err("running route removal must fail");
2370 assert!(err.to_string().contains("must be stopped before removal"));
2371
2372 controller.stop_route("rt-running").await.unwrap();
2373 controller.remove_route("rt-running").unwrap();
2374 }
2375
2376 #[tokio::test]
2377 async fn start_route_on_suspended_state_returns_guidance_error() {
2378 let mut controller = build_controller_with_components();
2379 set_self_ref(&mut controller);
2380
2381 let route = RouteDefinition::new(
2382 "timer:tick?period=40",
2383 vec![BuilderStep::To("mock:out".into())],
2384 )
2385 .with_route_id("rt-suspend");
2386 controller.add_route(route).unwrap();
2387
2388 controller.start_route("rt-suspend").await.unwrap();
2389 controller.suspend_route("rt-suspend").await.unwrap();
2390
2391 let err = controller
2392 .start_route("rt-suspend")
2393 .await
2394 .expect_err("start from suspended must fail");
2395 assert!(err.to_string().contains("use resume_route"));
2396
2397 controller.resume_route("rt-suspend").await.unwrap();
2398 controller.stop_route("rt-suspend").await.unwrap();
2399 }
2400
2401 #[tokio::test]
2402 async fn suspend_and_resume_validate_execution_state() {
2403 let mut controller = build_controller_with_components();
2404 set_self_ref(&mut controller);
2405
2406 controller
2407 .add_route(
2408 RouteDefinition::new("timer:tick?period=50", vec![]).with_route_id("rt-state"),
2409 )
2410 .unwrap();
2411
2412 let suspend_err = controller
2413 .suspend_route("rt-state")
2414 .await
2415 .expect_err("suspend before start must fail");
2416 assert!(suspend_err.to_string().contains("Cannot suspend route"));
2417
2418 controller.start_route("rt-state").await.unwrap();
2419 let resume_err = controller
2420 .resume_route("rt-state")
2421 .await
2422 .expect_err("resume while started must fail");
2423 assert!(resume_err.to_string().contains("Cannot resume route"));
2424
2425 controller.stop_route("rt-state").await.unwrap();
2426 }
2427
2428 #[tokio::test]
2429 async fn concurrent_concurrency_override_path_executes() {
2430 let mut controller = build_controller_with_components();
2431 set_self_ref(&mut controller);
2432
2433 let route = RouteDefinition::new(
2434 "timer:tick?period=10&repeatCount=2",
2435 vec![BuilderStep::To("mock:out".into())],
2436 )
2437 .with_route_id("rt-concurrent")
2438 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(2) });
2439
2440 controller.add_route(route).unwrap();
2441 controller.start_route("rt-concurrent").await.unwrap();
2442 tokio::time::sleep(Duration::from_millis(50)).await;
2443 controller.stop_route("rt-concurrent").await.unwrap();
2444 }
2445
2446 #[tokio::test]
2447 async fn add_route_with_circuit_breaker_and_error_handler_compiles() {
2448 use camel_api::circuit_breaker::CircuitBreakerConfig;
2449 use camel_api::error_handler::ErrorHandlerConfig;
2450
2451 let mut controller = build_controller_with_components();
2452 set_self_ref(&mut controller);
2453
2454 let route = RouteDefinition::new("timer:tick?period=25", vec![BuilderStep::Stop])
2455 .with_route_id("rt-eh")
2456 .with_circuit_breaker(CircuitBreakerConfig::new())
2457 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"));
2458
2459 controller
2460 .add_route(route)
2461 .expect("route with layers should compile");
2462 controller.start_route("rt-eh").await.unwrap();
2463 controller.stop_route("rt-eh").await.unwrap();
2464 }
2465
2466 #[tokio::test]
2467 async fn compile_and_swap_errors_for_missing_route() {
2468 let mut controller = build_controller_with_components();
2469 set_self_ref(&mut controller);
2470
2471 let compiled = controller
2472 .compile_route_definition(
2473 RouteDefinition::new("timer:tick?period=10", vec![BuilderStep::Stop])
2474 .with_route_id("compiled"),
2475 )
2476 .expect("compile should work");
2477
2478 let err = controller
2479 .swap_pipeline("nope", compiled)
2480 .expect_err("missing route swap must fail");
2481 assert!(err.to_string().contains("not found"));
2482 }
2483}