1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10
11const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::sync::CancellationToken;
15use tower::{Layer, Service, ServiceExt};
16use tracing::{error, info, warn};
17
18use camel_api::UnitOfWorkConfig;
19use camel_api::aggregator::AggregatorConfig;
20use camel_api::error_handler::ErrorHandlerConfig;
21use camel_api::metrics::MetricsCollector;
22use camel_api::{
23 BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
24 NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeCommand,
25 RuntimeHandle,
26};
27use camel_auth::TokenAuthenticator;
28use camel_component_api::{
29 ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
30};
31use camel_endpoint::parse_uri;
32pub use camel_processor::aggregator::SharedLanguageRegistry;
33use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
34use camel_processor::circuit_breaker::CircuitBreakerLayer;
35use camel_processor::error_handler::ErrorHandlerLayer;
36use camel_processor::security_policy_layer::SecurityPolicyLayer;
37
38use crate::health_registry::HealthCheckRegistry;
39use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
40use crate::lifecycle::adapters::route_compiler::{
41 compose_pipeline, compose_traced_pipeline_with_contracts,
42};
43use crate::lifecycle::application::route_definition::{
44 BuilderStep, RouteDefinition, RouteDefinitionInfo,
45};
46use crate::shared::components::domain::Registry;
47use crate::shared::observability::domain::{DetailLevel, TracerConfig};
48use arc_swap::ArcSwap;
49use camel_bean::BeanRegistry;
50
51#[derive(Debug, Clone)]
53pub struct CrashNotification {
54 pub route_id: String,
56 pub error: String,
58}
59
60pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
75unsafe impl Sync for SyncBoxProcessor {}
76
77type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
78
79#[cfg(test)]
80type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
81
82#[cfg(test)]
83static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
84 std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
85
86#[cfg(test)]
87fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
88 *START_ROUTE_EVENT_HOOK
89 .lock()
90 .expect("start route event hook lock") = hook;
91}
92
93#[cfg(test)]
94fn emit_start_route_event(event: &'static str) {
95 if let Some(hook) = START_ROUTE_EVENT_HOOK
96 .lock()
97 .expect("start route event hook lock")
98 .as_ref()
99 {
100 hook(event);
101 }
102}
103
104pub(super) struct AggregateSplitInfo {
106 pub(super) pre_pipeline: SharedPipeline,
107 pub(super) agg_config: AggregatorConfig,
108 pub(super) post_pipeline: SharedPipeline,
109}
110
111pub(super) struct ManagedRoute {
112 pub(super) definition: RouteDefinitionInfo,
114 pub(super) from_uri: String,
116 pub(super) pipeline: SharedPipeline,
118 pub(super) concurrency: Option<ConcurrencyModel>,
120 pub(super) consumer_handle: Option<JoinHandle<()>>,
122 pub(super) pipeline_handle: Option<JoinHandle<()>>,
124 pub(super) consumer_cancel_token: CancellationToken,
127 pub(super) pipeline_cancel_token: CancellationToken,
130 pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
133 pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
135 pub(super) aggregate_split: Option<AggregateSplitInfo>,
136 pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
137 pub(super) security_policy: Option<camel_api::security_policy::SecurityPolicyConfig>,
139 pub(super) security_authenticator: Option<Arc<dyn TokenAuthenticator>>,
141}
142
143pub(crate) struct PreparedRoute {
144 pub(crate) route_id: String,
145 pub(super) managed: ManagedRoute,
146}
147
148pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
149 handle.as_ref().is_some_and(|h| !h.is_finished())
150}
151
152fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
153 match (
154 handle_is_running(&managed.consumer_handle),
155 handle_is_running(&managed.pipeline_handle),
156 ) {
157 (true, true) => "Started",
158 (false, true) => "Suspended",
159 (true, false) => "Stopping",
160 (false, false) => "Stopped",
161 }
162}
163
164fn find_top_level_aggregate_requiring_split(
165 steps: &[BuilderStep],
166) -> Option<(usize, AggregatorConfig)> {
167 for (i, step) in steps.iter().enumerate() {
168 if let BuilderStep::Aggregate { config } = step {
169 if has_timeout_condition(&config.completion) || config.force_completion_on_stop {
170 return Some((i, config.clone()));
171 }
172 break;
173 }
174 }
175 None
176}
177
178pub(crate) struct ControllerComponentContext {
179 registry: Arc<std::sync::Mutex<Registry>>,
180 languages: SharedLanguageRegistry,
181 metrics: Arc<dyn MetricsCollector>,
182 platform_service: Arc<dyn PlatformService>,
183 health_registry: Arc<HealthCheckRegistry>,
184 route_id: Option<String>,
185}
186
187impl ControllerComponentContext {
188 pub(crate) fn new(
189 registry: Arc<std::sync::Mutex<Registry>>,
190 languages: SharedLanguageRegistry,
191 metrics: Arc<dyn MetricsCollector>,
192 platform_service: Arc<dyn PlatformService>,
193 health_registry: Arc<HealthCheckRegistry>,
194 route_id: Option<String>,
195 ) -> Self {
196 Self {
197 registry,
198 languages,
199 metrics,
200 platform_service,
201 health_registry,
202 route_id,
203 }
204 }
205}
206
207impl ComponentContext for ControllerComponentContext {
208 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
209 self.registry.lock().ok()?.get(scheme)
210 }
211
212 fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
213 self.languages.lock().ok()?.get(name).cloned()
214 }
215
216 fn metrics(&self) -> Arc<dyn MetricsCollector> {
217 Arc::clone(&self.metrics)
218 }
219
220 fn platform_service(&self) -> Arc<dyn PlatformService> {
221 Arc::clone(&self.platform_service)
222 }
223
224 fn register_route_health_check(
225 &self,
226 route_id: &str,
227 check: Arc<dyn camel_api::AsyncHealthCheck>,
228 ) {
229 self.health_registry.register_for_route(route_id, check);
230 }
231
232 fn unregister_route_health_check(&self, route_id: &str) {
233 self.health_registry.unregister_for_route(route_id);
234 }
235
236 fn route_id(&self) -> Option<&str> {
237 self.route_id.as_deref()
238 }
239}
240
241fn is_pending(ex: &Exchange) -> bool {
242 ex.property("CamelAggregatorPending")
243 .and_then(|v| v.as_bool())
244 .unwrap_or(false)
245}
246
247async fn ready_with_backoff(
254 pipeline: &mut BoxProcessor,
255 cancel: &CancellationToken,
256) -> Result<(), CamelError> {
257 loop {
258 match pipeline.ready().await {
259 Ok(_) => return Ok(()),
260 Err(CamelError::CircuitOpen(ref msg)) => {
261 warn!("Circuit open, backing off: {msg}");
262 tokio::select! {
263 _ = tokio::time::sleep(Duration::from_secs(1)) => {
264 continue;
265 }
266 _ = cancel.cancelled() => {
267 return Err(CamelError::CircuitOpen(msg.clone()));
269 }
270 }
271 }
272 Err(e) => {
273 error!("Pipeline not ready: {e}");
275 return Err(e);
276 }
277 }
278 }
279}
280
281fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
282 let stamp = std::time::SystemTime::now()
283 .duration_since(std::time::UNIX_EPOCH)
284 .unwrap_or_default()
285 .as_nanos();
286 RuntimeCommand::FailRoute {
287 route_id: route_id.to_string(),
288 error: error.to_string(),
289 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
290 causation_id: None,
291 }
292}
293
294pub(super) async fn publish_runtime_failure(
295 runtime: Option<Weak<dyn RuntimeHandle>>,
296 route_id: &str,
297 error: &str,
298) {
299 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
300 return;
301 };
302 let command = runtime_failure_command(route_id, error);
303 if let Err(runtime_error) = runtime.execute(command).await {
304 warn!(
305 route_id = %route_id,
306 error = %runtime_error,
307 "failed to synchronize route crash with runtime projection"
308 );
309 }
310}
311
312pub struct DefaultRouteController {
320 routes: HashMap<String, ManagedRoute>,
322 registry: Arc<std::sync::Mutex<Registry>>,
324 languages: SharedLanguageRegistry,
326 beans: Arc<std::sync::Mutex<BeanRegistry>>,
328 runtime: Option<Weak<dyn RuntimeHandle>>,
330 global_error_handler: Option<ErrorHandlerConfig>,
332 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
334 tracing_enabled: bool,
336 tracer_detail_level: DetailLevel,
338 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
340 platform_service: Arc<dyn PlatformService>,
341 function_invoker: Option<Arc<dyn FunctionInvoker>>,
342 health_registry: Option<Arc<HealthCheckRegistry>>,
343}
344
345impl DefaultRouteController {
346 fn health_registry(&self) -> Arc<HealthCheckRegistry> {
347 self.health_registry.clone().unwrap_or_else(|| {
348 warn!("health_registry not configured — creating isolated fallback");
349 Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
350 })
351 }
352
353 pub fn new(
355 registry: Arc<std::sync::Mutex<Registry>>,
356 platform_service: Arc<dyn PlatformService>,
357 ) -> Self {
358 Self::with_beans_and_platform_service(
359 registry,
360 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
361 platform_service,
362 )
363 }
364
365 pub fn with_beans(
367 registry: Arc<std::sync::Mutex<Registry>>,
368 beans: Arc<std::sync::Mutex<BeanRegistry>>,
369 ) -> Self {
370 Self::with_beans_and_platform_service(
371 registry,
372 beans,
373 Arc::new(NoopPlatformService::default()),
374 )
375 }
376
377 fn with_beans_and_platform_service(
378 registry: Arc<std::sync::Mutex<Registry>>,
379 beans: Arc<std::sync::Mutex<BeanRegistry>>,
380 platform_service: Arc<dyn PlatformService>,
381 ) -> Self {
382 Self {
383 routes: HashMap::new(),
384 registry,
385 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
386 beans,
387 runtime: None,
388 global_error_handler: None,
389 crash_notifier: None,
390 tracing_enabled: false,
391 tracer_detail_level: DetailLevel::Minimal,
392 tracer_metrics: None,
393 platform_service,
394 function_invoker: None,
395 health_registry: None,
396 }
397 }
398
399 pub fn with_languages(
401 registry: Arc<std::sync::Mutex<Registry>>,
402 languages: SharedLanguageRegistry,
403 platform_service: Arc<dyn PlatformService>,
404 ) -> Self {
405 Self {
406 routes: HashMap::new(),
407 registry,
408 languages,
409 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
410 runtime: None,
411 global_error_handler: None,
412 crash_notifier: None,
413 tracing_enabled: false,
414 tracer_detail_level: DetailLevel::Minimal,
415 tracer_metrics: None,
416 platform_service,
417 function_invoker: None,
418 health_registry: None,
419 }
420 }
421
422 pub fn with_languages_and_beans(
423 registry: Arc<std::sync::Mutex<Registry>>,
424 languages: SharedLanguageRegistry,
425 platform_service: Arc<dyn PlatformService>,
426 beans: Arc<std::sync::Mutex<BeanRegistry>>,
427 ) -> Self {
428 Self {
429 routes: HashMap::new(),
430 registry,
431 languages,
432 beans,
433 runtime: None,
434 global_error_handler: None,
435 crash_notifier: None,
436 tracing_enabled: false,
437 tracer_detail_level: DetailLevel::Minimal,
438 tracer_metrics: None,
439 platform_service,
440 function_invoker: None,
441 health_registry: None,
442 }
443 }
444
445 pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
446 self.function_invoker = Some(function_invoker);
447 self
448 }
449
450 pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
451 self.health_registry = Some(registry);
452 }
453
454 pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
455 self.function_invoker = Some(invoker);
456 }
457
458 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
460 self.runtime = Some(Arc::downgrade(&runtime));
461 }
462
463 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
468 self.crash_notifier = Some(tx);
469 }
470
471 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
473 self.global_error_handler = Some(config);
474 }
475
476 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
478 self.tracing_enabled = config.enabled;
479 self.tracer_detail_level = config.detail_level.clone();
480 self.tracer_metrics = config.metrics_collector.clone();
481 }
482
483 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
484 let mut producer_ctx = ProducerContext::new();
485 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
486 producer_ctx = producer_ctx.with_runtime(runtime);
487 }
488 Ok(producer_ctx)
489 }
490
491 fn resolve_error_handler(
493 &self,
494 config: ErrorHandlerConfig,
495 producer_ctx: &ProducerContext,
496 rt: Arc<dyn camel_component_api::RuntimeObservability>,
497 component_ctx: &dyn ComponentContext,
498 ) -> Result<ErrorHandlerLayer, CamelError> {
499 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
501 let parsed = parse_uri(uri)?;
502 let component = component_ctx
503 .resolve_component(&parsed.scheme)
504 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
505 let endpoint = component.create_endpoint(uri, component_ctx)?;
506 Some(endpoint.create_producer(Arc::clone(&rt), producer_ctx)?)
507 } else {
508 None
509 };
510
511 let mut resolved_policies = Vec::new();
513 for policy in config.policies {
514 let handler_producer = if let Some(ref uri) = policy.handled_by {
515 let parsed = parse_uri(uri)?;
516 let component = component_ctx
517 .resolve_component(&parsed.scheme)
518 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
519 let endpoint = component.create_endpoint(uri, component_ctx)?;
520 Some(endpoint.create_producer(Arc::clone(&rt), producer_ctx)?)
521 } else {
522 None
523 };
524 resolved_policies.push((policy, handler_producer));
525 }
526
527 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
528 }
529
530 fn resolve_uow_layer(
533 &self,
534 config: &UnitOfWorkConfig,
535 producer_ctx: &ProducerContext,
536 rt: Arc<dyn camel_component_api::RuntimeObservability>,
537 component_ctx: &dyn ComponentContext,
538 counter: Option<Arc<AtomicU64>>,
539 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
540 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
541 let parsed = parse_uri(uri)?;
542 let component = component_ctx
543 .resolve_component(&parsed.scheme)
544 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
545 let endpoint = component.create_endpoint(uri, component_ctx)?;
546 endpoint
547 .create_producer(Arc::clone(&rt), producer_ctx)
548 .map_err(|e| {
549 CamelError::RouteError(format!(
550 "UoW hook URI '{uri}' could not be resolved: {e}"
551 ))
552 })
553 };
554
555 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
556 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
557
558 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
559 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
560 Ok((layer, counter))
561 }
562
563 pub(crate) fn resolve_steps(
565 &self,
566 steps: Vec<BuilderStep>,
567 producer_ctx: &ProducerContext,
568 registry: &Arc<std::sync::Mutex<Registry>>,
569 route_id: Option<&str>,
570 staging_mode: &super::step_resolution::FunctionStagingMode,
571 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
572 let component_ctx = Arc::new(ControllerComponentContext::new(
573 Arc::clone(registry),
574 Arc::clone(&self.languages),
575 self.tracer_metrics
576 .clone()
577 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
578 Arc::clone(&self.platform_service),
579 self.health_registry(),
580 route_id.map(|s| s.to_string()),
581 ));
582 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
583 Arc::clone(&component_ctx) as Arc<_>;
584
585 super::step_resolution::resolve_steps(
586 steps,
587 producer_ctx,
588 rt,
589 registry,
590 &self.languages,
591 &self.beans,
592 self.function_invoker.clone(),
593 component_ctx,
594 route_id,
595 staging_mode,
596 )
597 }
598
599 pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
609 let route_id = definition.route_id().to_string();
610
611 if self.routes.contains_key(&route_id) {
612 return Err(CamelError::RouteError(format!(
613 "Route '{}' already exists",
614 route_id
615 )));
616 }
617
618 info!(route_id = %route_id, "Adding route to controller");
619
620 let prepared = match self.build_managed_route(
621 definition,
622 &super::step_resolution::FunctionStagingMode::DirectAdd,
623 ) {
624 Ok(prepared) => prepared,
625 Err(err) => {
626 self.discard_function_staging();
627 return Err(err);
628 }
629 };
630
631 if let Some(invoker) = &self.function_invoker
632 && let Err(err) = invoker.commit_staged().await
633 {
634 invoker.discard_staging(0);
635 return Err(CamelError::Config(err.to_string()));
636 }
637
638 self.routes
639 .insert(prepared.route_id.clone(), prepared.managed);
640
641 Ok(())
642 }
643
644 fn build_managed_route(
645 &self,
646 definition: RouteDefinition,
647 staging_mode: &super::step_resolution::FunctionStagingMode,
648 ) -> Result<PreparedRoute, CamelError> {
649 let route_id = definition.route_id().to_string();
650
651 let definition_info = definition.to_info();
652 let RouteDefinition {
653 from_uri,
654 steps,
655 error_handler,
656 circuit_breaker,
657 security_policy,
658 security_authenticator,
659 unit_of_work,
660 concurrency,
661 ..
662 } = definition;
663
664 let producer_ctx = self.build_producer_context()?;
665
666 let mut aggregate_split: Option<AggregateSplitInfo> = None;
667 let processors_with_contracts = match find_top_level_aggregate_requiring_split(&steps) {
668 Some((idx, agg_config)) => {
669 let mut pre_steps = steps;
670 let mut rest = pre_steps.split_off(idx);
671 let _agg_step = rest.remove(0);
672 let post_steps = rest;
673
674 let pre_pairs = self.resolve_steps(
675 pre_steps,
676 &producer_ctx,
677 &self.registry,
678 Some(&route_id),
679 staging_mode,
680 )?;
681 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
682 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
683 compose_pipeline(pre_procs),
684 )));
685
686 let post_pairs = self.resolve_steps(
687 post_steps,
688 &producer_ctx,
689 &self.registry,
690 Some(&route_id),
691 staging_mode,
692 )?;
693 let post_procs: Vec<BoxProcessor> =
694 post_pairs.into_iter().map(|(p, _)| p).collect();
695 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
696 compose_pipeline(post_procs),
697 )));
698
699 aggregate_split = Some(AggregateSplitInfo {
700 pre_pipeline,
701 agg_config,
702 post_pipeline,
703 });
704
705 vec![]
706 }
707 None => self.resolve_steps(
708 steps,
709 &producer_ctx,
710 &self.registry,
711 Some(&route_id),
712 staging_mode,
713 )?,
714 };
715 let route_id_for_tracing = route_id.clone();
716 let mut pipeline = if processors_with_contracts.is_empty() {
717 BoxProcessor::new(IdentityProcessor)
718 } else {
719 compose_traced_pipeline_with_contracts(
720 processors_with_contracts,
721 &route_id_for_tracing,
722 self.tracing_enabled,
723 self.tracer_detail_level.clone(),
724 self.tracer_metrics.clone(),
725 )
726 };
727
728 if let Some(cb_config) = circuit_breaker {
729 let cb_layer = CircuitBreakerLayer::new(cb_config);
730 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
731 }
732
733 if let Some(sp_config) = security_policy.clone() {
734 let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
735 pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
736 }
737
738 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
739
740 if let Some(config) = eh_config {
741 let component_ctx = Arc::new(ControllerComponentContext::new(
742 Arc::clone(&self.registry),
743 Arc::clone(&self.languages),
744 self.tracer_metrics
745 .clone()
746 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
747 Arc::clone(&self.platform_service),
748 self.health_registry(),
749 Some(route_id.clone()),
750 ));
751 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
752 Arc::clone(&component_ctx) as Arc<_>;
753 let layer =
754 self.resolve_error_handler(config, &producer_ctx, rt, component_ctx.as_ref())?;
755 pipeline = BoxProcessor::new(layer.layer(pipeline));
756 }
757
758 let uow_counter = if let Some(uow_config) = &unit_of_work {
759 let component_ctx = Arc::new(ControllerComponentContext::new(
760 Arc::clone(&self.registry),
761 Arc::clone(&self.languages),
762 self.tracer_metrics
763 .clone()
764 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
765 Arc::clone(&self.platform_service),
766 self.health_registry(),
767 Some(route_id.clone()),
768 ));
769 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
770 Arc::clone(&component_ctx) as Arc<_>;
771 let (uow_layer, counter) = self.resolve_uow_layer(
772 uow_config,
773 &producer_ctx,
774 rt,
775 component_ctx.as_ref(),
776 None,
777 )?;
778 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
779 Some(counter)
780 } else {
781 None
782 };
783
784 Ok(PreparedRoute {
785 route_id,
786 managed: ManagedRoute {
787 definition: definition_info,
788 from_uri,
789 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
790 concurrency,
791 consumer_handle: None,
792 pipeline_handle: None,
793 consumer_cancel_token: CancellationToken::new(),
794 pipeline_cancel_token: CancellationToken::new(),
795 channel_sender: None,
796 in_flight: uow_counter,
797 aggregate_split,
798 agg_service: None,
799 security_policy,
800 security_authenticator,
801 },
802 })
803 }
804
805 pub(crate) fn insert_prepared_route(
806 &mut self,
807 prepared: PreparedRoute,
808 ) -> Result<(), CamelError> {
809 if self.routes.contains_key(&prepared.route_id) {
810 return Err(CamelError::RouteError(format!(
811 "Route '{}' already exists",
812 prepared.route_id
813 )));
814 }
815 self.routes
816 .insert(prepared.route_id.clone(), prepared.managed);
817 Ok(())
818 }
819
820 pub async fn add_route_with_generation(
821 &mut self,
822 definition: RouteDefinition,
823 generation: u64,
824 ) -> Result<(), CamelError> {
825 let route_id = definition.route_id().to_string();
826
827 if self.routes.contains_key(&route_id) {
828 return Err(CamelError::RouteError(format!(
829 "Route '{}' already exists",
830 route_id
831 )));
832 }
833
834 info!(route_id = %route_id, generation, "Adding route to controller with generation");
835
836 let prepared = self.build_managed_route(
837 definition,
838 &super::step_resolution::FunctionStagingMode::HotReload { generation },
839 )?;
840
841 self.routes
842 .insert(prepared.route_id.clone(), prepared.managed);
843
844 Ok(())
845 }
846
847 pub(crate) fn prepare_route_definition_with_generation(
848 &self,
849 definition: RouteDefinition,
850 generation: u64,
851 ) -> Result<PreparedRoute, CamelError> {
852 self.build_managed_route(
853 definition,
854 &super::step_resolution::FunctionStagingMode::HotReload { generation },
855 )
856 }
857
858 pub async fn remove_route_preserving_functions(
859 &mut self,
860 route_id: &str,
861 ) -> Result<(), CamelError> {
862 let managed = self.routes.get(route_id).ok_or_else(|| {
863 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
864 })?;
865 if handle_is_running(&managed.consumer_handle)
866 || handle_is_running(&managed.pipeline_handle)
867 {
868 return Err(CamelError::RouteError(format!(
869 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
870 route_id,
871 inferred_lifecycle_label(managed)
872 )));
873 }
874 self.routes.remove(route_id);
875 if let Some(reg) = &self.health_registry {
876 reg.unregister_for_route(route_id);
877 }
878 info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
879 Ok(())
880 }
881
882 pub fn compile_route_definition(
883 &self,
884 def: RouteDefinition,
885 ) -> Result<BoxProcessor, CamelError> {
886 let route_id = def.route_id().to_string();
887
888 let producer_ctx = self.build_producer_context()?;
889
890 let processors_with_contracts = self.resolve_steps(
891 def.steps,
892 &producer_ctx,
893 &self.registry,
894 Some(&route_id),
895 &super::step_resolution::FunctionStagingMode::DryCompile,
896 )?;
897 let mut pipeline = compose_traced_pipeline_with_contracts(
898 processors_with_contracts,
899 &route_id,
900 self.tracing_enabled,
901 self.tracer_detail_level.clone(),
902 self.tracer_metrics.clone(),
903 );
904
905 if let Some(cb_config) = def.circuit_breaker {
906 let cb_layer = CircuitBreakerLayer::new(cb_config);
907 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
908 }
909
910 if let Some(sp_config) = def.security_policy {
911 let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
912 pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
913 }
914
915 let eh_config = def
916 .error_handler
917 .clone()
918 .or_else(|| self.global_error_handler.clone());
919 if let Some(config) = eh_config {
920 let component_ctx = Arc::new(ControllerComponentContext::new(
921 Arc::clone(&self.registry),
922 Arc::clone(&self.languages),
923 self.tracer_metrics
924 .clone()
925 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
926 Arc::clone(&self.platform_service),
927 self.health_registry(),
928 Some(route_id.clone()),
929 ));
930 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
931 Arc::clone(&component_ctx) as Arc<_>;
932 let layer =
933 self.resolve_error_handler(config, &producer_ctx, rt, component_ctx.as_ref())?;
934 pipeline = BoxProcessor::new(layer.layer(pipeline));
935 }
936
937 if let Some(uow_config) = &def.unit_of_work {
939 let existing_counter = self
940 .routes
941 .get(&route_id)
942 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
943
944 let component_ctx = Arc::new(ControllerComponentContext::new(
945 Arc::clone(&self.registry),
946 Arc::clone(&self.languages),
947 self.tracer_metrics
948 .clone()
949 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
950 Arc::clone(&self.platform_service),
951 self.health_registry(),
952 Some(route_id.clone()),
953 ));
954 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
955 Arc::clone(&component_ctx) as Arc<_>;
956
957 let (uow_layer, _counter) = self.resolve_uow_layer(
958 uow_config,
959 &producer_ctx,
960 rt,
961 component_ctx.as_ref(),
962 existing_counter,
963 )?;
964
965 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
966 }
967
968 Ok(pipeline)
969 }
970
971 pub fn compile_route_definition_with_generation(
972 &self,
973 def: RouteDefinition,
974 generation: u64,
975 ) -> Result<BoxProcessor, CamelError> {
976 let route_id = def.route_id().to_string();
977
978 let producer_ctx = self.build_producer_context()?;
979
980 let processors_with_contracts = self.resolve_steps(
981 def.steps,
982 &producer_ctx,
983 &self.registry,
984 Some(&route_id),
985 &super::step_resolution::FunctionStagingMode::HotReload { generation },
986 )?;
987 let mut pipeline = compose_traced_pipeline_with_contracts(
988 processors_with_contracts,
989 &route_id,
990 self.tracing_enabled,
991 self.tracer_detail_level.clone(),
992 self.tracer_metrics.clone(),
993 );
994
995 if let Some(cb_config) = def.circuit_breaker {
996 let cb_layer = CircuitBreakerLayer::new(cb_config);
997 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
998 }
999
1000 if let Some(sp_config) = def.security_policy {
1001 let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
1002 pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
1003 }
1004
1005 let eh_config = def
1006 .error_handler
1007 .clone()
1008 .or_else(|| self.global_error_handler.clone());
1009 if let Some(config) = eh_config {
1010 let component_ctx = Arc::new(ControllerComponentContext::new(
1011 Arc::clone(&self.registry),
1012 Arc::clone(&self.languages),
1013 self.tracer_metrics
1014 .clone()
1015 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1016 Arc::clone(&self.platform_service),
1017 self.health_registry(),
1018 Some(route_id.clone()),
1019 ));
1020 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
1021 Arc::clone(&component_ctx) as Arc<_>;
1022 let layer =
1023 self.resolve_error_handler(config, &producer_ctx, rt, component_ctx.as_ref())?;
1024 pipeline = BoxProcessor::new(layer.layer(pipeline));
1025 }
1026
1027 if let Some(uow_config) = &def.unit_of_work {
1028 let existing_counter = self
1029 .routes
1030 .get(&route_id)
1031 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1032
1033 let component_ctx = Arc::new(ControllerComponentContext::new(
1034 Arc::clone(&self.registry),
1035 Arc::clone(&self.languages),
1036 self.tracer_metrics
1037 .clone()
1038 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1039 Arc::clone(&self.platform_service),
1040 self.health_registry(),
1041 Some(route_id.clone()),
1042 ));
1043 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
1044 Arc::clone(&component_ctx) as Arc<_>;
1045
1046 let (uow_layer, _counter) = self.resolve_uow_layer(
1047 uow_config,
1048 &producer_ctx,
1049 rt,
1050 component_ctx.as_ref(),
1051 existing_counter,
1052 )?;
1053
1054 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1055 }
1056
1057 Ok(pipeline)
1058 }
1059
1060 pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1066 let managed = self.routes.get(route_id).ok_or_else(|| {
1067 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1068 })?;
1069 if handle_is_running(&managed.consumer_handle)
1070 || handle_is_running(&managed.pipeline_handle)
1071 {
1072 return Err(CamelError::RouteError(format!(
1073 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1074 route_id,
1075 inferred_lifecycle_label(managed)
1076 )));
1077 }
1078 if let Some(invoker) = &self.function_invoker {
1079 for (id, rid) in self.collect_function_refs(route_id) {
1080 if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
1081 warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
1082 }
1083 }
1084 }
1085 self.routes.remove(route_id);
1086 if let Some(reg) = &self.health_registry {
1087 reg.unregister_for_route(route_id);
1088 }
1089 info!(route_id = %route_id, "Route removed from controller");
1090 Ok(())
1091 }
1092
1093 fn collect_function_refs(
1094 &self,
1095 route_id: &str,
1096 ) -> Vec<(camel_api::FunctionId, Option<String>)> {
1097 self.function_invoker
1098 .as_ref()
1099 .map(|invoker| invoker.function_refs_for_route(route_id))
1100 .unwrap_or_default()
1101 }
1102
1103 fn discard_function_staging(&self) {
1104 if let Some(invoker) = &self.function_invoker {
1105 invoker.discard_staging(0);
1106 }
1107 }
1108
1109 pub fn route_count(&self) -> usize {
1111 self.routes.len()
1112 }
1113
1114 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1115 self.routes.get(route_id).map(|r| {
1116 r.in_flight
1117 .as_ref()
1118 .map_or(0, |c| c.load(Ordering::Relaxed))
1119 })
1120 }
1121
1122 pub fn route_exists(&self, route_id: &str) -> bool {
1124 self.routes.contains_key(route_id)
1125 }
1126
1127 pub fn route_ids(&self) -> Vec<String> {
1129 self.routes.keys().cloned().collect()
1130 }
1131
1132 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
1133 self.routes
1134 .get(route_id)
1135 .and_then(|m| m.definition.source_hash())
1136 }
1137
1138 pub fn auto_startup_route_ids(&self) -> Vec<String> {
1140 let mut pairs: Vec<(String, i32)> = self
1141 .routes
1142 .iter()
1143 .filter(|(_, managed)| managed.definition.auto_startup())
1144 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1145 .collect();
1146 pairs.sort_by_key(|(_, order)| *order);
1147 pairs.into_iter().map(|(id, _)| id).collect()
1148 }
1149
1150 pub fn shutdown_route_ids(&self) -> Vec<String> {
1152 let mut pairs: Vec<(String, i32)> = self
1153 .routes
1154 .iter()
1155 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1156 .collect();
1157 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1158 pairs.into_iter().map(|(id, _)| id).collect()
1159 }
1160
1161 pub fn swap_pipeline(
1166 &self,
1167 route_id: &str,
1168 new_pipeline: BoxProcessor,
1169 ) -> Result<(), CamelError> {
1170 let managed = self
1171 .routes
1172 .get(route_id)
1173 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1174
1175 if managed.aggregate_split.is_some() {
1176 tracing::warn!(
1177 route_id = %route_id,
1178 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1179 );
1180 }
1181
1182 managed
1183 .pipeline
1184 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1185 info!(route_id = %route_id, "Pipeline swapped atomically");
1186 Ok(())
1187 }
1188
1189 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1191 self.routes.get(route_id).map(|r| r.from_uri.clone())
1192 }
1193
1194 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1199 self.routes
1200 .get(route_id)
1201 .map(|r| r.pipeline.load().0.clone())
1202 }
1203
1204 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1206 super::consumer_management::stop_route_internal(
1207 &mut self.routes,
1208 route_id,
1209 DEFAULT_SHUTDOWN_TIMEOUT,
1210 )
1211 .await
1212 }
1213
1214 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1215 self.start_route(route_id).await
1216 }
1217
1218 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1219 self.stop_route(route_id).await
1220 }
1221}
1222
1223#[async_trait::async_trait]
1224impl RouteController for DefaultRouteController {
1225 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1226 {
1228 let managed = self
1229 .routes
1230 .get_mut(route_id)
1231 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1232
1233 let consumer_running = handle_is_running(&managed.consumer_handle);
1234 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1235 if consumer_running && pipeline_running {
1236 return Ok(());
1237 }
1238 if !consumer_running && pipeline_running {
1239 return Err(CamelError::RouteError(format!(
1240 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1241 route_id
1242 )));
1243 }
1244 if consumer_running && !pipeline_running {
1245 return Err(CamelError::RouteError(format!(
1246 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1247 route_id
1248 )));
1249 }
1250 }
1251
1252 info!(route_id = %route_id, "Starting route");
1253
1254 let (from_uri, pipeline, concurrency) = {
1256 let managed = self
1257 .routes
1258 .get(route_id)
1259 .expect("invariant: route must exist after prior existence check"); (
1261 managed.from_uri.clone(),
1262 Arc::clone(&managed.pipeline),
1263 managed.concurrency.clone(),
1264 )
1265 };
1266
1267 let crash_notifier = self.crash_notifier.clone();
1269 let runtime_for_consumer = self.runtime.clone();
1270
1271 let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
1272 Arc::clone(&self.registry),
1273 Arc::clone(&self.languages),
1274 self.tracer_metrics
1275 .clone()
1276 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1277 Arc::clone(&self.platform_service),
1278 self.health_registry(),
1279 Some(route_id.to_string()),
1280 ));
1281 let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
1282 Arc::clone(&consumer_component_ctx) as Arc<_>;
1283 let (mut consumer, consumer_concurrency) =
1284 super::consumer_management::create_route_consumer(
1285 consumer_rt,
1286 &self.registry,
1287 &from_uri,
1288 consumer_component_ctx.as_ref(),
1289 )?;
1290
1291 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1293
1294 let managed = self
1296 .routes
1297 .get_mut(route_id)
1298 .expect("invariant: route must exist after prior existence check"); if let (Some(sp_config), Some(authenticator)) = (
1302 managed.security_policy.as_ref(),
1303 managed.security_authenticator.as_ref(),
1304 ) {
1305 use camel_component_api::SecurityContext;
1306 let sec_ctx =
1307 SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
1308 consumer.set_security_context(sec_ctx);
1309 }
1310
1311 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1313 let consumer_cancel = managed.consumer_cancel_token.child_token();
1315 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1316 let tx_for_storage = tx.clone();
1318 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1319
1320 let managed = self
1322 .routes
1323 .get_mut(route_id)
1324 .expect("invariant: route must exist after prior existence check"); if let Some(split) = managed.aggregate_split.as_ref() {
1327 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1328
1329 let route_cancel_clone = pipeline_cancel.clone();
1330 let svc = AggregatorService::new(
1331 split.agg_config.clone(),
1332 late_tx,
1333 Arc::clone(&self.languages),
1334 route_cancel_clone,
1335 );
1336 let agg = Arc::new(std::sync::Mutex::new(svc));
1337
1338 let pipeline_cancel_for_monitor = pipeline_cancel.clone();
1339 let agg_for_monitor = Arc::clone(&agg);
1340
1341 managed.agg_service = Some(Arc::clone(&agg));
1342
1343 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1344 let pre_pipeline = Arc::clone(&split.pre_pipeline);
1345 let post_pipeline = Arc::clone(&split.post_pipeline);
1346
1347 let pipeline_handle = tokio::spawn(async move {
1349 loop {
1350 tokio::select! {
1351 biased;
1352
1353 late_ex = async {
1354 let mut rx = late_rx.lock().await;
1355 rx.recv().await
1356 } => {
1357 match late_ex {
1358 Some(ex) => {
1359 let pipe = post_pipeline.load();
1360 if let Err(e) = pipe.0.clone().oneshot(ex).await {
1361 tracing::warn!(error = %e, "late exchange post-pipeline failed");
1362 }
1363 }
1364 None => return,
1365 }
1366 }
1367
1368 envelope_opt = rx.recv() => {
1369 match envelope_opt {
1370 Some(envelope) => {
1371 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1372 let pre_pipe = pre_pipeline.load();
1373 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1374 Ok(ex) => ex,
1375 Err(e) => {
1376 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1377 continue;
1378 }
1379 };
1380
1381 let ex = {
1382 let cloned_svc = agg
1383 .lock()
1384 .expect("mutex poisoned: another thread panicked while holding this lock") .clone();
1386 cloned_svc.oneshot(ex).await
1387 };
1388
1389 match ex {
1390 Ok(ex) => {
1391 if !is_pending(&ex) {
1392 let post_pipe = post_pipeline.load();
1393 let out = post_pipe.0.clone().oneshot(ex).await;
1394 if let Some(tx) = reply_tx { let _ = tx.send(out); }
1395 } else if let Some(tx) = reply_tx {
1396 let _ = tx.send(Ok(ex));
1397 }
1398 }
1399 Err(e) => {
1400 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1401 }
1402 }
1403 }
1404 None => return,
1405 }
1406 }
1407
1408 _ = pipeline_cancel.cancelled() => {
1409 {
1410 let guard = agg
1411 .lock()
1412 .expect("mutex poisoned: another thread panicked while holding this lock"); guard.force_complete_all();
1414 }
1415 let mut rx_guard = late_rx.lock().await;
1416 while let Ok(late_ex) = rx_guard.try_recv() {
1417 let pipe = post_pipeline.load();
1418 let _ = pipe.0.clone().oneshot(late_ex).await;
1419 }
1420 break;
1421 }
1422 }
1423 }
1424 });
1425 #[cfg(test)]
1426 emit_start_route_event("pipeline_spawned");
1427
1428 let consumer_handle = super::consumer_management::spawn_consumer_task(
1431 route_id.to_string(),
1432 consumer,
1433 consumer_ctx,
1434 crash_notifier,
1435 runtime_for_consumer,
1436 false,
1437 );
1438
1439 let force_on_stop = agg_for_monitor
1443 .lock()
1444 .expect("mutex poisoned: another thread panicked while holding this lock") .config()
1446 .force_completion_on_stop;
1447 let consumer_handle = tokio::spawn(async move {
1448 let _ = consumer_handle.await;
1449 if !pipeline_cancel_for_monitor.is_cancelled() {
1450 let guard = agg_for_monitor
1451 .lock()
1452 .expect("mutex poisoned: another thread panicked while holding this lock"); guard.force_complete_all();
1454 drop(guard);
1455 if force_on_stop {
1456 pipeline_cancel_for_monitor.cancel();
1457 }
1458 }
1459 });
1460 #[cfg(test)]
1461 emit_start_route_event("consumer_spawned");
1462
1463 let managed = self
1464 .routes
1465 .get_mut(route_id)
1466 .expect("invariant: route must exist"); managed.consumer_handle = Some(consumer_handle);
1468 managed.pipeline_handle = Some(pipeline_handle);
1469 managed.channel_sender = Some(tx_for_storage);
1470
1471 info!(route_id = %route_id, "Route started (aggregate with timeout)");
1472 return Ok(());
1473 }
1474 let pipeline_handle = match effective_concurrency {
1478 ConcurrencyModel::Sequential => {
1479 tokio::spawn(async move {
1480 loop {
1481 let envelope = tokio::select! {
1483 envelope = rx.recv() => match envelope {
1484 Some(e) => e,
1485 None => return, },
1487 _ = pipeline_cancel.cancelled() => {
1488 return;
1490 }
1491 };
1492 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1493
1494 let mut pipeline = pipeline.load().0.clone();
1496
1497 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1498 if let Some(tx) = reply_tx {
1499 let _ = tx.send(Err(e));
1500 }
1501 return;
1502 }
1503
1504 let result = pipeline.call(exchange).await;
1505 if let Some(tx) = reply_tx {
1506 let _ = tx.send(result);
1507 } else if let Err(ref e) = result
1508 && !matches!(e, CamelError::Stopped)
1509 {
1510 error!("Pipeline error: {e}");
1512 }
1513 }
1514 })
1515 }
1516 ConcurrencyModel::Concurrent { max } => {
1517 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1518 tokio::spawn(async move {
1519 loop {
1520 let envelope = tokio::select! {
1522 envelope = rx.recv() => match envelope {
1523 Some(e) => e,
1524 None => return, },
1526 _ = pipeline_cancel.cancelled() => {
1527 return;
1529 }
1530 };
1531 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1532 let pipe_ref = Arc::clone(&pipeline);
1533 let sem = sem.clone();
1534 let cancel = pipeline_cancel.clone();
1535 tokio::spawn(async move {
1536 let _permit = match &sem {
1538 Some(s) => Some(s.acquire().await.expect("semaphore closed")), None => None,
1540 };
1541
1542 let mut pipe = pipe_ref.load().0.clone();
1544
1545 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1547 if let Some(tx) = reply_tx {
1548 let _ = tx.send(Err(e));
1549 }
1550 return;
1551 }
1552
1553 let result = pipe.call(exchange).await;
1554 if let Some(tx) = reply_tx {
1555 let _ = tx.send(result);
1556 } else if let Err(ref e) = result
1557 && !matches!(e, CamelError::Stopped)
1558 {
1559 error!("Pipeline error: {e}");
1561 }
1562 });
1563 }
1564 })
1565 }
1566 };
1567 #[cfg(test)]
1568 emit_start_route_event("pipeline_spawned");
1569
1570 let consumer_handle = super::consumer_management::spawn_consumer_task(
1573 route_id.to_string(),
1574 consumer,
1575 consumer_ctx,
1576 crash_notifier,
1577 runtime_for_consumer,
1578 false,
1579 );
1580 #[cfg(test)]
1581 emit_start_route_event("consumer_spawned");
1582
1583 let managed = self
1585 .routes
1586 .get_mut(route_id)
1587 .expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
1589 managed.pipeline_handle = Some(pipeline_handle);
1590 managed.channel_sender = Some(tx_for_storage);
1591
1592 info!(route_id = %route_id, "Route started");
1593 Ok(())
1594 }
1595
1596 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1597 self.stop_route_internal(route_id).await?;
1598 if let Some(reg) = &self.health_registry {
1599 reg.unregister_for_route(route_id);
1600 }
1601 Ok(())
1602 }
1603
1604 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1605 self.stop_route(route_id).await?;
1606 tokio::time::sleep(Duration::from_millis(100)).await;
1607 self.start_route(route_id).await
1608 }
1609
1610 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1611 let managed = self
1613 .routes
1614 .get_mut(route_id)
1615 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1616
1617 let consumer_running = handle_is_running(&managed.consumer_handle);
1618 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1619
1620 if !consumer_running || !pipeline_running {
1622 return Err(CamelError::RouteError(format!(
1623 "Cannot suspend route '{}' with execution lifecycle {}",
1624 route_id,
1625 inferred_lifecycle_label(managed)
1626 )));
1627 }
1628
1629 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1630
1631 let managed = self
1633 .routes
1634 .get_mut(route_id)
1635 .expect("invariant: route must exist after prior existence check"); managed.consumer_cancel_token.cancel();
1637
1638 let managed = self
1640 .routes
1641 .get_mut(route_id)
1642 .expect("invariant: route must exist after prior existence check"); let consumer_handle = managed.consumer_handle.take();
1644
1645 let timeout_result = tokio::time::timeout(DEFAULT_SHUTDOWN_TIMEOUT, async {
1647 if let Some(handle) = consumer_handle {
1648 let _ = handle.await;
1649 }
1650 })
1651 .await;
1652
1653 if timeout_result.is_err() {
1654 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1655 }
1656
1657 let managed = self
1659 .routes
1660 .get_mut(route_id)
1661 .expect("invariant: route must exist after prior existence check"); managed.consumer_cancel_token = CancellationToken::new();
1665
1666 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1667 Ok(())
1668 }
1669
1670 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1671 let managed = self
1673 .routes
1674 .get(route_id)
1675 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1676
1677 let consumer_running = handle_is_running(&managed.consumer_handle);
1678 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1679 if consumer_running || !pipeline_running {
1680 return Err(CamelError::RouteError(format!(
1681 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1682 route_id,
1683 inferred_lifecycle_label(managed)
1684 )));
1685 }
1686
1687 let sender = managed.channel_sender.clone().ok_or_else(|| {
1689 CamelError::RouteError("Suspended route has no channel sender".into())
1690 })?;
1691
1692 let from_uri = managed.from_uri.clone();
1694
1695 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1696
1697 let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
1698 Arc::clone(&self.registry),
1699 Arc::clone(&self.languages),
1700 self.tracer_metrics
1701 .clone()
1702 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1703 Arc::clone(&self.platform_service),
1704 self.health_registry(),
1705 Some(route_id.to_string()),
1706 ));
1707 let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
1708 Arc::clone(&consumer_component_ctx) as Arc<_>;
1709 let (mut consumer, _) = super::consumer_management::create_route_consumer(
1710 consumer_rt,
1711 &self.registry,
1712 &from_uri,
1713 consumer_component_ctx.as_ref(),
1714 )?;
1715
1716 let managed = self
1718 .routes
1719 .get(route_id)
1720 .expect("invariant: route must exist after prior existence check"); if let (Some(sp_config), Some(authenticator)) = (
1722 managed.security_policy.as_ref(),
1723 managed.security_authenticator.as_ref(),
1724 ) {
1725 use camel_component_api::SecurityContext;
1726 let sec_ctx =
1727 SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
1728 consumer.set_security_context(sec_ctx);
1729 }
1730
1731 let managed = self
1733 .routes
1734 .get_mut(route_id)
1735 .expect("invariant: route must exist after prior existence check"); let consumer_cancel = managed.consumer_cancel_token.child_token();
1739
1740 let crash_notifier = self.crash_notifier.clone();
1741 let runtime_for_consumer = self.runtime.clone();
1742
1743 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1745
1746 let consumer_handle = super::consumer_management::spawn_consumer_task(
1748 route_id.to_string(),
1749 consumer,
1750 consumer_ctx,
1751 crash_notifier,
1752 runtime_for_consumer,
1753 true,
1754 );
1755
1756 let managed = self
1758 .routes
1759 .get_mut(route_id)
1760 .expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
1762
1763 info!(route_id = %route_id, "Route resumed");
1764 Ok(())
1765 }
1766
1767 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1768 let route_ids: Vec<String> = {
1771 let mut pairs: Vec<_> = self
1772 .routes
1773 .iter()
1774 .filter(|(_, r)| r.definition.auto_startup())
1775 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1776 .collect();
1777 pairs.sort_by_key(|(_, order)| *order);
1778 pairs.into_iter().map(|(id, _)| id).collect()
1779 };
1780
1781 info!("Starting {} auto-startup routes", route_ids.len());
1782
1783 let mut errors: Vec<String> = Vec::new();
1785 for route_id in route_ids {
1786 if let Err(e) = self.start_route(&route_id).await {
1787 errors.push(format!("Route '{}': {}", route_id, e));
1788 }
1789 }
1790
1791 if !errors.is_empty() {
1792 return Err(CamelError::RouteError(format!(
1793 "Failed to start routes: {}",
1794 errors.join(", ")
1795 )));
1796 }
1797
1798 info!("All auto-startup routes started");
1799 Ok(())
1800 }
1801
1802 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1803 let route_ids: Vec<String> = {
1805 let mut pairs: Vec<_> = self
1806 .routes
1807 .iter()
1808 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1809 .collect();
1810 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1811 pairs.into_iter().map(|(id, _)| id).collect()
1812 };
1813
1814 info!("Stopping {} routes", route_ids.len());
1815
1816 for route_id in route_ids {
1817 let _ = self.stop_route(&route_id).await;
1818 }
1819
1820 info!("All routes stopped");
1821 Ok(())
1822 }
1823}
1824
1825#[cfg(test)]
1826#[path = "route_controller_tests.rs"]
1827mod tests;