1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10
11const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::sync::CancellationToken;
15use tower::{Layer, Service, ServiceExt};
16use tracing::{error, info, warn};
17
18use camel_api::UnitOfWorkConfig;
19use camel_api::aggregator::AggregatorConfig;
20use camel_api::error_handler::ErrorHandlerConfig;
21use camel_api::metrics::MetricsCollector;
22use camel_api::{
23 BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
24 NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeCommand,
25 RuntimeHandle,
26};
27use camel_auth::TokenAuthenticator;
28use camel_component_api::{
29 ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
30};
31use camel_endpoint::parse_uri;
32pub use camel_processor::aggregator::SharedLanguageRegistry;
33use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
34use camel_processor::circuit_breaker::CircuitBreakerLayer;
35use camel_processor::error_handler::ErrorHandlerLayer;
36use camel_processor::security_policy_layer::SecurityPolicyLayer;
37
38use crate::health_registry::HealthCheckRegistry;
39use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
40use crate::lifecycle::adapters::route_compiler::{
41 compose_pipeline, compose_traced_pipeline_with_contracts,
42};
43use crate::lifecycle::application::route_definition::{
44 BuilderStep, RouteDefinition, RouteDefinitionInfo,
45};
46use crate::shared::components::domain::Registry;
47use crate::shared::observability::domain::{DetailLevel, TracerConfig};
48use arc_swap::ArcSwap;
49use camel_bean::BeanRegistry;
50
51#[derive(Debug, Clone)]
53pub struct CrashNotification {
54 pub route_id: String,
56 pub error: String,
58}
59
60pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
75unsafe impl Sync for SyncBoxProcessor {}
76
77type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
78
79#[cfg(test)]
80type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
81
82#[cfg(test)]
83static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
84 std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
85
86#[cfg(test)]
87fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
88 *START_ROUTE_EVENT_HOOK
89 .lock()
90 .expect("start route event hook lock") = hook;
91}
92
93#[cfg(test)]
94fn emit_start_route_event(event: &'static str) {
95 if let Some(hook) = START_ROUTE_EVENT_HOOK
96 .lock()
97 .expect("start route event hook lock")
98 .as_ref()
99 {
100 hook(event);
101 }
102}
103
104pub(super) struct AggregateSplitInfo {
106 pub(super) pre_pipeline: SharedPipeline,
107 pub(super) agg_config: AggregatorConfig,
108 pub(super) post_pipeline: SharedPipeline,
109}
110
111pub(super) struct ManagedRoute {
112 pub(super) definition: RouteDefinitionInfo,
114 pub(super) from_uri: String,
116 pub(super) pipeline: SharedPipeline,
118 pub(super) concurrency: Option<ConcurrencyModel>,
120 pub(super) consumer_handle: Option<JoinHandle<()>>,
122 pub(super) pipeline_handle: Option<JoinHandle<()>>,
124 pub(super) consumer_cancel_token: CancellationToken,
127 pub(super) pipeline_cancel_token: CancellationToken,
130 pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
133 pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
135 pub(super) aggregate_split: Option<AggregateSplitInfo>,
136 pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
137 pub(super) security_policy: Option<camel_api::security_policy::SecurityPolicyConfig>,
139 pub(super) security_authenticator: Option<Arc<dyn TokenAuthenticator>>,
141}
142
143pub(crate) struct PreparedRoute {
144 pub(crate) route_id: String,
145 pub(super) managed: ManagedRoute,
146}
147
148pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
149 handle.as_ref().is_some_and(|h| !h.is_finished())
150}
151
152fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
153 match (
154 handle_is_running(&managed.consumer_handle),
155 handle_is_running(&managed.pipeline_handle),
156 ) {
157 (true, true) => "Started",
158 (false, true) => "Suspended",
159 (true, false) => "Stopping",
160 (false, false) => "Stopped",
161 }
162}
163
164fn find_top_level_aggregate_with_timeout(
165 steps: &[BuilderStep],
166) -> Option<(usize, AggregatorConfig)> {
167 for (i, step) in steps.iter().enumerate() {
168 if let BuilderStep::Aggregate { config } = step {
169 if has_timeout_condition(&config.completion) {
170 return Some((i, config.clone()));
171 }
172 break;
173 }
174 }
175 None
176}
177
178pub(crate) struct ControllerComponentContext {
179 registry: Arc<std::sync::Mutex<Registry>>,
180 languages: SharedLanguageRegistry,
181 metrics: Arc<dyn MetricsCollector>,
182 platform_service: Arc<dyn PlatformService>,
183 health_registry: Arc<HealthCheckRegistry>,
184 route_id: Option<String>,
185}
186
187impl ControllerComponentContext {
188 pub(crate) fn new(
189 registry: Arc<std::sync::Mutex<Registry>>,
190 languages: SharedLanguageRegistry,
191 metrics: Arc<dyn MetricsCollector>,
192 platform_service: Arc<dyn PlatformService>,
193 health_registry: Arc<HealthCheckRegistry>,
194 route_id: Option<String>,
195 ) -> Self {
196 Self {
197 registry,
198 languages,
199 metrics,
200 platform_service,
201 health_registry,
202 route_id,
203 }
204 }
205}
206
207impl ComponentContext for ControllerComponentContext {
208 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
209 self.registry.lock().ok()?.get(scheme)
210 }
211
212 fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
213 self.languages.lock().ok()?.get(name).cloned()
214 }
215
216 fn metrics(&self) -> Arc<dyn MetricsCollector> {
217 Arc::clone(&self.metrics)
218 }
219
220 fn platform_service(&self) -> Arc<dyn PlatformService> {
221 Arc::clone(&self.platform_service)
222 }
223
224 fn register_route_health_check(
225 &self,
226 route_id: &str,
227 check: Arc<dyn camel_api::AsyncHealthCheck>,
228 ) {
229 self.health_registry.register_for_route(route_id, check);
230 }
231
232 fn unregister_route_health_check(&self, route_id: &str) {
233 self.health_registry.unregister_for_route(route_id);
234 }
235
236 fn route_id(&self) -> Option<&str> {
237 self.route_id.as_deref()
238 }
239}
240
241fn is_pending(ex: &Exchange) -> bool {
242 ex.property("CamelAggregatorPending")
243 .and_then(|v| v.as_bool())
244 .unwrap_or(false)
245}
246
247async fn ready_with_backoff(
254 pipeline: &mut BoxProcessor,
255 cancel: &CancellationToken,
256) -> Result<(), CamelError> {
257 loop {
258 match pipeline.ready().await {
259 Ok(_) => return Ok(()),
260 Err(CamelError::CircuitOpen(ref msg)) => {
261 warn!("Circuit open, backing off: {msg}");
262 tokio::select! {
263 _ = tokio::time::sleep(Duration::from_secs(1)) => {
264 continue;
265 }
266 _ = cancel.cancelled() => {
267 return Err(CamelError::CircuitOpen(msg.clone()));
269 }
270 }
271 }
272 Err(e) => {
273 error!("Pipeline not ready: {e}");
274 return Err(e);
275 }
276 }
277 }
278}
279
280fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
281 let stamp = std::time::SystemTime::now()
282 .duration_since(std::time::UNIX_EPOCH)
283 .unwrap_or_default()
284 .as_nanos();
285 RuntimeCommand::FailRoute {
286 route_id: route_id.to_string(),
287 error: error.to_string(),
288 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
289 causation_id: None,
290 }
291}
292
293pub(super) async fn publish_runtime_failure(
294 runtime: Option<Weak<dyn RuntimeHandle>>,
295 route_id: &str,
296 error: &str,
297) {
298 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
299 return;
300 };
301 let command = runtime_failure_command(route_id, error);
302 if let Err(runtime_error) = runtime.execute(command).await {
303 warn!(
304 route_id = %route_id,
305 error = %runtime_error,
306 "failed to synchronize route crash with runtime projection"
307 );
308 }
309}
310
311pub struct DefaultRouteController {
319 routes: HashMap<String, ManagedRoute>,
321 registry: Arc<std::sync::Mutex<Registry>>,
323 languages: SharedLanguageRegistry,
325 beans: Arc<std::sync::Mutex<BeanRegistry>>,
327 runtime: Option<Weak<dyn RuntimeHandle>>,
329 global_error_handler: Option<ErrorHandlerConfig>,
331 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
333 tracing_enabled: bool,
335 tracer_detail_level: DetailLevel,
337 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
339 platform_service: Arc<dyn PlatformService>,
340 function_invoker: Option<Arc<dyn FunctionInvoker>>,
341 health_registry: Option<Arc<HealthCheckRegistry>>,
342}
343
344impl DefaultRouteController {
345 fn health_registry(&self) -> Arc<HealthCheckRegistry> {
346 self.health_registry.clone().unwrap_or_else(|| {
347 warn!("health_registry not configured — creating isolated fallback");
348 Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
349 })
350 }
351
352 pub fn new(
354 registry: Arc<std::sync::Mutex<Registry>>,
355 platform_service: Arc<dyn PlatformService>,
356 ) -> Self {
357 Self::with_beans_and_platform_service(
358 registry,
359 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
360 platform_service,
361 )
362 }
363
364 pub fn with_beans(
366 registry: Arc<std::sync::Mutex<Registry>>,
367 beans: Arc<std::sync::Mutex<BeanRegistry>>,
368 ) -> Self {
369 Self::with_beans_and_platform_service(
370 registry,
371 beans,
372 Arc::new(NoopPlatformService::default()),
373 )
374 }
375
376 fn with_beans_and_platform_service(
377 registry: Arc<std::sync::Mutex<Registry>>,
378 beans: Arc<std::sync::Mutex<BeanRegistry>>,
379 platform_service: Arc<dyn PlatformService>,
380 ) -> Self {
381 Self {
382 routes: HashMap::new(),
383 registry,
384 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
385 beans,
386 runtime: None,
387 global_error_handler: None,
388 crash_notifier: None,
389 tracing_enabled: false,
390 tracer_detail_level: DetailLevel::Minimal,
391 tracer_metrics: None,
392 platform_service,
393 function_invoker: None,
394 health_registry: None,
395 }
396 }
397
398 pub fn with_languages(
400 registry: Arc<std::sync::Mutex<Registry>>,
401 languages: SharedLanguageRegistry,
402 platform_service: Arc<dyn PlatformService>,
403 ) -> Self {
404 Self {
405 routes: HashMap::new(),
406 registry,
407 languages,
408 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
409 runtime: None,
410 global_error_handler: None,
411 crash_notifier: None,
412 tracing_enabled: false,
413 tracer_detail_level: DetailLevel::Minimal,
414 tracer_metrics: None,
415 platform_service,
416 function_invoker: None,
417 health_registry: None,
418 }
419 }
420
421 pub fn with_languages_and_beans(
422 registry: Arc<std::sync::Mutex<Registry>>,
423 languages: SharedLanguageRegistry,
424 platform_service: Arc<dyn PlatformService>,
425 beans: Arc<std::sync::Mutex<BeanRegistry>>,
426 ) -> Self {
427 Self {
428 routes: HashMap::new(),
429 registry,
430 languages,
431 beans,
432 runtime: None,
433 global_error_handler: None,
434 crash_notifier: None,
435 tracing_enabled: false,
436 tracer_detail_level: DetailLevel::Minimal,
437 tracer_metrics: None,
438 platform_service,
439 function_invoker: None,
440 health_registry: None,
441 }
442 }
443
444 pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
445 self.function_invoker = Some(function_invoker);
446 self
447 }
448
449 pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
450 self.health_registry = Some(registry);
451 }
452
453 pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
454 self.function_invoker = Some(invoker);
455 }
456
457 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
459 self.runtime = Some(Arc::downgrade(&runtime));
460 }
461
462 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
467 self.crash_notifier = Some(tx);
468 }
469
470 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
472 self.global_error_handler = Some(config);
473 }
474
475 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
477 self.tracing_enabled = config.enabled;
478 self.tracer_detail_level = config.detail_level.clone();
479 self.tracer_metrics = config.metrics_collector.clone();
480 }
481
482 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
483 let mut producer_ctx = ProducerContext::new();
484 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
485 producer_ctx = producer_ctx.with_runtime(runtime);
486 }
487 Ok(producer_ctx)
488 }
489
490 fn resolve_error_handler(
492 &self,
493 config: ErrorHandlerConfig,
494 producer_ctx: &ProducerContext,
495 component_ctx: &dyn ComponentContext,
496 ) -> Result<ErrorHandlerLayer, CamelError> {
497 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
499 let parsed = parse_uri(uri)?;
500 let component = component_ctx
501 .resolve_component(&parsed.scheme)
502 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
503 let endpoint = component.create_endpoint(uri, component_ctx)?;
504 Some(endpoint.create_producer(producer_ctx)?)
505 } else {
506 None
507 };
508
509 let mut resolved_policies = Vec::new();
511 for policy in config.policies {
512 let handler_producer = if let Some(ref uri) = policy.handled_by {
513 let parsed = parse_uri(uri)?;
514 let component = component_ctx
515 .resolve_component(&parsed.scheme)
516 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
517 let endpoint = component.create_endpoint(uri, component_ctx)?;
518 Some(endpoint.create_producer(producer_ctx)?)
519 } else {
520 None
521 };
522 resolved_policies.push((policy, handler_producer));
523 }
524
525 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
526 }
527
528 fn resolve_uow_layer(
531 &self,
532 config: &UnitOfWorkConfig,
533 producer_ctx: &ProducerContext,
534 component_ctx: &dyn ComponentContext,
535 counter: Option<Arc<AtomicU64>>,
536 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
537 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
538 let parsed = parse_uri(uri)?;
539 let component = component_ctx
540 .resolve_component(&parsed.scheme)
541 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
542 let endpoint = component.create_endpoint(uri, component_ctx)?;
543 endpoint.create_producer(producer_ctx).map_err(|e| {
544 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
545 })
546 };
547
548 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
549 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
550
551 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
552 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
553 Ok((layer, counter))
554 }
555
556 pub(crate) fn resolve_steps(
558 &self,
559 steps: Vec<BuilderStep>,
560 producer_ctx: &ProducerContext,
561 registry: &Arc<std::sync::Mutex<Registry>>,
562 route_id: Option<&str>,
563 staging_mode: &super::step_resolution::FunctionStagingMode,
564 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
565 let component_ctx = Arc::new(ControllerComponentContext::new(
566 Arc::clone(registry),
567 Arc::clone(&self.languages),
568 self.tracer_metrics
569 .clone()
570 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
571 Arc::clone(&self.platform_service),
572 self.health_registry(),
573 route_id.map(|s| s.to_string()),
574 ));
575
576 super::step_resolution::resolve_steps(
577 steps,
578 producer_ctx,
579 registry,
580 &self.languages,
581 &self.beans,
582 self.function_invoker.clone(),
583 component_ctx,
584 route_id,
585 staging_mode,
586 )
587 }
588
589 pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
599 let route_id = definition.route_id().to_string();
600
601 if self.routes.contains_key(&route_id) {
602 return Err(CamelError::RouteError(format!(
603 "Route '{}' already exists",
604 route_id
605 )));
606 }
607
608 info!(route_id = %route_id, "Adding route to controller");
609
610 let prepared = match self.build_managed_route(
611 definition,
612 &super::step_resolution::FunctionStagingMode::DirectAdd,
613 ) {
614 Ok(prepared) => prepared,
615 Err(err) => {
616 self.discard_function_staging();
617 return Err(err);
618 }
619 };
620
621 if let Some(invoker) = &self.function_invoker
622 && let Err(err) = invoker.commit_staged().await
623 {
624 invoker.discard_staging(0);
625 return Err(CamelError::Config(err.to_string()));
626 }
627
628 self.routes
629 .insert(prepared.route_id.clone(), prepared.managed);
630
631 Ok(())
632 }
633
634 fn build_managed_route(
635 &self,
636 definition: RouteDefinition,
637 staging_mode: &super::step_resolution::FunctionStagingMode,
638 ) -> Result<PreparedRoute, CamelError> {
639 let route_id = definition.route_id().to_string();
640
641 let definition_info = definition.to_info();
642 let RouteDefinition {
643 from_uri,
644 steps,
645 error_handler,
646 circuit_breaker,
647 security_policy,
648 security_authenticator,
649 unit_of_work,
650 concurrency,
651 ..
652 } = definition;
653
654 let producer_ctx = self.build_producer_context()?;
655
656 let mut aggregate_split: Option<AggregateSplitInfo> = None;
657 let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
658 Some((idx, agg_config)) => {
659 let mut pre_steps = steps;
660 let mut rest = pre_steps.split_off(idx);
661 let _agg_step = rest.remove(0);
662 let post_steps = rest;
663
664 let pre_pairs = self.resolve_steps(
665 pre_steps,
666 &producer_ctx,
667 &self.registry,
668 Some(&route_id),
669 staging_mode,
670 )?;
671 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
672 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
673 compose_pipeline(pre_procs),
674 )));
675
676 let post_pairs = self.resolve_steps(
677 post_steps,
678 &producer_ctx,
679 &self.registry,
680 Some(&route_id),
681 staging_mode,
682 )?;
683 let post_procs: Vec<BoxProcessor> =
684 post_pairs.into_iter().map(|(p, _)| p).collect();
685 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
686 compose_pipeline(post_procs),
687 )));
688
689 aggregate_split = Some(AggregateSplitInfo {
690 pre_pipeline,
691 agg_config,
692 post_pipeline,
693 });
694
695 vec![]
696 }
697 None => self.resolve_steps(
698 steps,
699 &producer_ctx,
700 &self.registry,
701 Some(&route_id),
702 staging_mode,
703 )?,
704 };
705 let route_id_for_tracing = route_id.clone();
706 let mut pipeline = if processors_with_contracts.is_empty() {
707 BoxProcessor::new(IdentityProcessor)
708 } else {
709 compose_traced_pipeline_with_contracts(
710 processors_with_contracts,
711 &route_id_for_tracing,
712 self.tracing_enabled,
713 self.tracer_detail_level.clone(),
714 self.tracer_metrics.clone(),
715 )
716 };
717
718 if let Some(cb_config) = circuit_breaker {
719 let cb_layer = CircuitBreakerLayer::new(cb_config);
720 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
721 }
722
723 if let Some(sp_config) = security_policy.clone() {
724 let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
725 pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
726 }
727
728 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
729
730 if let Some(config) = eh_config {
731 let component_ctx = ControllerComponentContext::new(
732 Arc::clone(&self.registry),
733 Arc::clone(&self.languages),
734 self.tracer_metrics
735 .clone()
736 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
737 Arc::clone(&self.platform_service),
738 self.health_registry(),
739 Some(route_id.clone()),
740 );
741 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
742 pipeline = BoxProcessor::new(layer.layer(pipeline));
743 }
744
745 let uow_counter = if let Some(uow_config) = &unit_of_work {
746 let component_ctx = ControllerComponentContext::new(
747 Arc::clone(&self.registry),
748 Arc::clone(&self.languages),
749 self.tracer_metrics
750 .clone()
751 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
752 Arc::clone(&self.platform_service),
753 self.health_registry(),
754 Some(route_id.clone()),
755 );
756 let (uow_layer, counter) =
757 self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
758 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
759 Some(counter)
760 } else {
761 None
762 };
763
764 Ok(PreparedRoute {
765 route_id,
766 managed: ManagedRoute {
767 definition: definition_info,
768 from_uri,
769 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
770 concurrency,
771 consumer_handle: None,
772 pipeline_handle: None,
773 consumer_cancel_token: CancellationToken::new(),
774 pipeline_cancel_token: CancellationToken::new(),
775 channel_sender: None,
776 in_flight: uow_counter,
777 aggregate_split,
778 agg_service: None,
779 security_policy,
780 security_authenticator,
781 },
782 })
783 }
784
785 pub(crate) fn insert_prepared_route(
786 &mut self,
787 prepared: PreparedRoute,
788 ) -> Result<(), CamelError> {
789 if self.routes.contains_key(&prepared.route_id) {
790 return Err(CamelError::RouteError(format!(
791 "Route '{}' already exists",
792 prepared.route_id
793 )));
794 }
795 self.routes
796 .insert(prepared.route_id.clone(), prepared.managed);
797 Ok(())
798 }
799
800 pub async fn add_route_with_generation(
801 &mut self,
802 definition: RouteDefinition,
803 generation: u64,
804 ) -> Result<(), CamelError> {
805 let route_id = definition.route_id().to_string();
806
807 if self.routes.contains_key(&route_id) {
808 return Err(CamelError::RouteError(format!(
809 "Route '{}' already exists",
810 route_id
811 )));
812 }
813
814 info!(route_id = %route_id, generation, "Adding route to controller with generation");
815
816 let prepared = self.build_managed_route(
817 definition,
818 &super::step_resolution::FunctionStagingMode::HotReload { generation },
819 )?;
820
821 self.routes
822 .insert(prepared.route_id.clone(), prepared.managed);
823
824 Ok(())
825 }
826
827 pub(crate) fn prepare_route_definition_with_generation(
828 &self,
829 definition: RouteDefinition,
830 generation: u64,
831 ) -> Result<PreparedRoute, CamelError> {
832 self.build_managed_route(
833 definition,
834 &super::step_resolution::FunctionStagingMode::HotReload { generation },
835 )
836 }
837
838 pub async fn remove_route_preserving_functions(
839 &mut self,
840 route_id: &str,
841 ) -> Result<(), CamelError> {
842 let managed = self.routes.get(route_id).ok_or_else(|| {
843 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
844 })?;
845 if handle_is_running(&managed.consumer_handle)
846 || handle_is_running(&managed.pipeline_handle)
847 {
848 return Err(CamelError::RouteError(format!(
849 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
850 route_id,
851 inferred_lifecycle_label(managed)
852 )));
853 }
854 self.routes.remove(route_id);
855 if let Some(reg) = &self.health_registry {
856 reg.unregister_for_route(route_id);
857 }
858 info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
859 Ok(())
860 }
861
862 pub fn compile_route_definition(
863 &self,
864 def: RouteDefinition,
865 ) -> Result<BoxProcessor, CamelError> {
866 let route_id = def.route_id().to_string();
867
868 let producer_ctx = self.build_producer_context()?;
869
870 let processors_with_contracts = self.resolve_steps(
871 def.steps,
872 &producer_ctx,
873 &self.registry,
874 Some(&route_id),
875 &super::step_resolution::FunctionStagingMode::DryCompile,
876 )?;
877 let mut pipeline = compose_traced_pipeline_with_contracts(
878 processors_with_contracts,
879 &route_id,
880 self.tracing_enabled,
881 self.tracer_detail_level.clone(),
882 self.tracer_metrics.clone(),
883 );
884
885 if let Some(cb_config) = def.circuit_breaker {
886 let cb_layer = CircuitBreakerLayer::new(cb_config);
887 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
888 }
889
890 if let Some(sp_config) = def.security_policy {
891 let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
892 pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
893 }
894
895 let eh_config = def
896 .error_handler
897 .clone()
898 .or_else(|| self.global_error_handler.clone());
899 if let Some(config) = eh_config {
900 let component_ctx = ControllerComponentContext::new(
901 Arc::clone(&self.registry),
902 Arc::clone(&self.languages),
903 self.tracer_metrics
904 .clone()
905 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
906 Arc::clone(&self.platform_service),
907 self.health_registry(),
908 Some(route_id.clone()),
909 );
910 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
911 pipeline = BoxProcessor::new(layer.layer(pipeline));
912 }
913
914 if let Some(uow_config) = &def.unit_of_work {
916 let existing_counter = self
917 .routes
918 .get(&route_id)
919 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
920
921 let component_ctx = ControllerComponentContext::new(
922 Arc::clone(&self.registry),
923 Arc::clone(&self.languages),
924 self.tracer_metrics
925 .clone()
926 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
927 Arc::clone(&self.platform_service),
928 self.health_registry(),
929 Some(route_id.clone()),
930 );
931
932 let (uow_layer, _counter) = self.resolve_uow_layer(
933 uow_config,
934 &producer_ctx,
935 &component_ctx,
936 existing_counter,
937 )?;
938
939 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
940 }
941
942 Ok(pipeline)
943 }
944
945 pub fn compile_route_definition_with_generation(
946 &self,
947 def: RouteDefinition,
948 generation: u64,
949 ) -> Result<BoxProcessor, CamelError> {
950 let route_id = def.route_id().to_string();
951
952 let producer_ctx = self.build_producer_context()?;
953
954 let processors_with_contracts = self.resolve_steps(
955 def.steps,
956 &producer_ctx,
957 &self.registry,
958 Some(&route_id),
959 &super::step_resolution::FunctionStagingMode::HotReload { generation },
960 )?;
961 let mut pipeline = compose_traced_pipeline_with_contracts(
962 processors_with_contracts,
963 &route_id,
964 self.tracing_enabled,
965 self.tracer_detail_level.clone(),
966 self.tracer_metrics.clone(),
967 );
968
969 if let Some(cb_config) = def.circuit_breaker {
970 let cb_layer = CircuitBreakerLayer::new(cb_config);
971 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
972 }
973
974 if let Some(sp_config) = def.security_policy {
975 let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
976 pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
977 }
978
979 let eh_config = def
980 .error_handler
981 .clone()
982 .or_else(|| self.global_error_handler.clone());
983 if let Some(config) = eh_config {
984 let component_ctx = ControllerComponentContext::new(
985 Arc::clone(&self.registry),
986 Arc::clone(&self.languages),
987 self.tracer_metrics
988 .clone()
989 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
990 Arc::clone(&self.platform_service),
991 self.health_registry(),
992 Some(route_id.clone()),
993 );
994 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
995 pipeline = BoxProcessor::new(layer.layer(pipeline));
996 }
997
998 if let Some(uow_config) = &def.unit_of_work {
999 let existing_counter = self
1000 .routes
1001 .get(&route_id)
1002 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1003
1004 let component_ctx = ControllerComponentContext::new(
1005 Arc::clone(&self.registry),
1006 Arc::clone(&self.languages),
1007 self.tracer_metrics
1008 .clone()
1009 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1010 Arc::clone(&self.platform_service),
1011 self.health_registry(),
1012 Some(route_id.clone()),
1013 );
1014
1015 let (uow_layer, _counter) = self.resolve_uow_layer(
1016 uow_config,
1017 &producer_ctx,
1018 &component_ctx,
1019 existing_counter,
1020 )?;
1021
1022 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1023 }
1024
1025 Ok(pipeline)
1026 }
1027
1028 pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1034 let managed = self.routes.get(route_id).ok_or_else(|| {
1035 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1036 })?;
1037 if handle_is_running(&managed.consumer_handle)
1038 || handle_is_running(&managed.pipeline_handle)
1039 {
1040 return Err(CamelError::RouteError(format!(
1041 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1042 route_id,
1043 inferred_lifecycle_label(managed)
1044 )));
1045 }
1046 if let Some(invoker) = &self.function_invoker {
1047 for (id, rid) in self.collect_function_refs(route_id) {
1048 if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
1049 warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
1050 }
1051 }
1052 }
1053 self.routes.remove(route_id);
1054 if let Some(reg) = &self.health_registry {
1055 reg.unregister_for_route(route_id);
1056 }
1057 info!(route_id = %route_id, "Route removed from controller");
1058 Ok(())
1059 }
1060
1061 fn collect_function_refs(
1062 &self,
1063 route_id: &str,
1064 ) -> Vec<(camel_api::FunctionId, Option<String>)> {
1065 self.function_invoker
1066 .as_ref()
1067 .map(|invoker| invoker.function_refs_for_route(route_id))
1068 .unwrap_or_default()
1069 }
1070
1071 fn discard_function_staging(&self) {
1072 if let Some(invoker) = &self.function_invoker {
1073 invoker.discard_staging(0);
1074 }
1075 }
1076
1077 pub fn route_count(&self) -> usize {
1079 self.routes.len()
1080 }
1081
1082 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1083 self.routes.get(route_id).map(|r| {
1084 r.in_flight
1085 .as_ref()
1086 .map_or(0, |c| c.load(Ordering::Relaxed))
1087 })
1088 }
1089
1090 pub fn route_exists(&self, route_id: &str) -> bool {
1092 self.routes.contains_key(route_id)
1093 }
1094
1095 pub fn route_ids(&self) -> Vec<String> {
1097 self.routes.keys().cloned().collect()
1098 }
1099
1100 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
1101 self.routes
1102 .get(route_id)
1103 .and_then(|m| m.definition.source_hash())
1104 }
1105
1106 pub fn auto_startup_route_ids(&self) -> Vec<String> {
1108 let mut pairs: Vec<(String, i32)> = self
1109 .routes
1110 .iter()
1111 .filter(|(_, managed)| managed.definition.auto_startup())
1112 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1113 .collect();
1114 pairs.sort_by_key(|(_, order)| *order);
1115 pairs.into_iter().map(|(id, _)| id).collect()
1116 }
1117
1118 pub fn shutdown_route_ids(&self) -> Vec<String> {
1120 let mut pairs: Vec<(String, i32)> = self
1121 .routes
1122 .iter()
1123 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1124 .collect();
1125 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1126 pairs.into_iter().map(|(id, _)| id).collect()
1127 }
1128
1129 pub fn swap_pipeline(
1134 &self,
1135 route_id: &str,
1136 new_pipeline: BoxProcessor,
1137 ) -> Result<(), CamelError> {
1138 let managed = self
1139 .routes
1140 .get(route_id)
1141 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1142
1143 if managed.aggregate_split.is_some() {
1144 tracing::warn!(
1145 route_id = %route_id,
1146 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1147 );
1148 }
1149
1150 managed
1151 .pipeline
1152 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1153 info!(route_id = %route_id, "Pipeline swapped atomically");
1154 Ok(())
1155 }
1156
1157 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1159 self.routes.get(route_id).map(|r| r.from_uri.clone())
1160 }
1161
1162 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1167 self.routes
1168 .get(route_id)
1169 .map(|r| r.pipeline.load().0.clone())
1170 }
1171
1172 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1174 super::consumer_management::stop_route_internal(
1175 &mut self.routes,
1176 route_id,
1177 DEFAULT_SHUTDOWN_TIMEOUT,
1178 )
1179 .await
1180 }
1181
1182 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1183 self.start_route(route_id).await
1184 }
1185
1186 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1187 self.stop_route(route_id).await
1188 }
1189}
1190
1191#[async_trait::async_trait]
1192impl RouteController for DefaultRouteController {
1193 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1194 {
1196 let managed = self
1197 .routes
1198 .get_mut(route_id)
1199 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1200
1201 let consumer_running = handle_is_running(&managed.consumer_handle);
1202 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1203 if consumer_running && pipeline_running {
1204 return Ok(());
1205 }
1206 if !consumer_running && pipeline_running {
1207 return Err(CamelError::RouteError(format!(
1208 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1209 route_id
1210 )));
1211 }
1212 if consumer_running && !pipeline_running {
1213 return Err(CamelError::RouteError(format!(
1214 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1215 route_id
1216 )));
1217 }
1218 }
1219
1220 info!(route_id = %route_id, "Starting route");
1221
1222 let (from_uri, pipeline, concurrency) = {
1224 let managed = self
1225 .routes
1226 .get(route_id)
1227 .expect("invariant: route must exist after prior existence check"); (
1229 managed.from_uri.clone(),
1230 Arc::clone(&managed.pipeline),
1231 managed.concurrency.clone(),
1232 )
1233 };
1234
1235 let crash_notifier = self.crash_notifier.clone();
1237 let runtime_for_consumer = self.runtime.clone();
1238
1239 let (mut consumer, consumer_concurrency) =
1240 super::consumer_management::create_route_consumer(
1241 &self.registry,
1242 &from_uri,
1243 &ControllerComponentContext::new(
1244 Arc::clone(&self.registry),
1245 Arc::clone(&self.languages),
1246 self.tracer_metrics
1247 .clone()
1248 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1249 Arc::clone(&self.platform_service),
1250 self.health_registry(),
1251 Some(route_id.to_string()),
1252 ),
1253 )?;
1254
1255 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1257
1258 let managed = self
1260 .routes
1261 .get_mut(route_id)
1262 .expect("invariant: route must exist after prior existence check"); if let (Some(sp_config), Some(authenticator)) = (
1266 managed.security_policy.as_ref(),
1267 managed.security_authenticator.as_ref(),
1268 ) {
1269 use camel_component_api::SecurityContext;
1270 let sec_ctx =
1271 SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
1272 consumer.set_security_context(sec_ctx);
1273 }
1274
1275 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1277 let consumer_cancel = managed.consumer_cancel_token.child_token();
1279 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1280 let tx_for_storage = tx.clone();
1282 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1283
1284 let managed = self
1286 .routes
1287 .get_mut(route_id)
1288 .expect("invariant: route must exist after prior existence check"); if let Some(split) = managed.aggregate_split.as_ref() {
1291 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1292
1293 let route_cancel_clone = pipeline_cancel.clone();
1294 let svc = AggregatorService::new(
1295 split.agg_config.clone(),
1296 late_tx,
1297 Arc::clone(&self.languages),
1298 route_cancel_clone,
1299 );
1300 let agg = Arc::new(std::sync::Mutex::new(svc));
1301
1302 managed.agg_service = Some(Arc::clone(&agg));
1303
1304 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1305 let pre_pipeline = Arc::clone(&split.pre_pipeline);
1306 let post_pipeline = Arc::clone(&split.post_pipeline);
1307
1308 let pipeline_handle = tokio::spawn(async move {
1310 loop {
1311 tokio::select! {
1312 biased;
1313
1314 late_ex = async {
1315 let mut rx = late_rx.lock().await;
1316 rx.recv().await
1317 } => {
1318 match late_ex {
1319 Some(ex) => {
1320 let pipe = post_pipeline.load();
1321 if let Err(e) = pipe.0.clone().oneshot(ex).await {
1322 tracing::warn!(error = %e, "late exchange post-pipeline failed");
1323 }
1324 }
1325 None => return,
1326 }
1327 }
1328
1329 envelope_opt = rx.recv() => {
1330 match envelope_opt {
1331 Some(envelope) => {
1332 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1333 let pre_pipe = pre_pipeline.load();
1334 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1335 Ok(ex) => ex,
1336 Err(e) => {
1337 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1338 continue;
1339 }
1340 };
1341
1342 let ex = {
1343 let cloned_svc = agg
1344 .lock()
1345 .expect("mutex poisoned: another thread panicked while holding this lock") .clone();
1347 cloned_svc.oneshot(ex).await
1348 };
1349
1350 match ex {
1351 Ok(ex) => {
1352 if !is_pending(&ex) {
1353 let post_pipe = post_pipeline.load();
1354 let out = post_pipe.0.clone().oneshot(ex).await;
1355 if let Some(tx) = reply_tx { let _ = tx.send(out); }
1356 } else if let Some(tx) = reply_tx {
1357 let _ = tx.send(Ok(ex));
1358 }
1359 }
1360 Err(e) => {
1361 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1362 }
1363 }
1364 }
1365 None => return,
1366 }
1367 }
1368
1369 _ = pipeline_cancel.cancelled() => {
1370 {
1371 let guard = agg
1372 .lock()
1373 .expect("mutex poisoned: another thread panicked while holding this lock"); guard.force_complete_all();
1375 }
1376 let mut rx_guard = late_rx.lock().await;
1377 while let Ok(late_ex) = rx_guard.try_recv() {
1378 let pipe = post_pipeline.load();
1379 let _ = pipe.0.clone().oneshot(late_ex).await;
1380 }
1381 break;
1382 }
1383 }
1384 }
1385 });
1386 #[cfg(test)]
1387 emit_start_route_event("pipeline_spawned");
1388
1389 let consumer_handle = super::consumer_management::spawn_consumer_task(
1392 route_id.to_string(),
1393 consumer,
1394 consumer_ctx,
1395 crash_notifier,
1396 runtime_for_consumer,
1397 false,
1398 );
1399 #[cfg(test)]
1400 emit_start_route_event("consumer_spawned");
1401
1402 let managed = self
1403 .routes
1404 .get_mut(route_id)
1405 .expect("invariant: route must exist"); managed.consumer_handle = Some(consumer_handle);
1407 managed.pipeline_handle = Some(pipeline_handle);
1408 managed.channel_sender = Some(tx_for_storage);
1409
1410 info!(route_id = %route_id, "Route started (aggregate with timeout)");
1411 return Ok(());
1412 }
1413 let pipeline_handle = match effective_concurrency {
1417 ConcurrencyModel::Sequential => {
1418 tokio::spawn(async move {
1419 loop {
1420 let envelope = tokio::select! {
1422 envelope = rx.recv() => match envelope {
1423 Some(e) => e,
1424 None => return, },
1426 _ = pipeline_cancel.cancelled() => {
1427 return;
1429 }
1430 };
1431 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1432
1433 let mut pipeline = pipeline.load().0.clone();
1435
1436 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1437 if let Some(tx) = reply_tx {
1438 let _ = tx.send(Err(e));
1439 }
1440 return;
1441 }
1442
1443 let result = pipeline.call(exchange).await;
1444 if let Some(tx) = reply_tx {
1445 let _ = tx.send(result);
1446 } else if let Err(ref e) = result
1447 && !matches!(e, CamelError::Stopped)
1448 {
1449 error!("Pipeline error: {e}");
1450 }
1451 }
1452 })
1453 }
1454 ConcurrencyModel::Concurrent { max } => {
1455 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1456 tokio::spawn(async move {
1457 loop {
1458 let envelope = tokio::select! {
1460 envelope = rx.recv() => match envelope {
1461 Some(e) => e,
1462 None => return, },
1464 _ = pipeline_cancel.cancelled() => {
1465 return;
1467 }
1468 };
1469 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1470 let pipe_ref = Arc::clone(&pipeline);
1471 let sem = sem.clone();
1472 let cancel = pipeline_cancel.clone();
1473 tokio::spawn(async move {
1474 let _permit = match &sem {
1476 Some(s) => Some(s.acquire().await.expect("semaphore closed")), None => None,
1478 };
1479
1480 let mut pipe = pipe_ref.load().0.clone();
1482
1483 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1485 if let Some(tx) = reply_tx {
1486 let _ = tx.send(Err(e));
1487 }
1488 return;
1489 }
1490
1491 let result = pipe.call(exchange).await;
1492 if let Some(tx) = reply_tx {
1493 let _ = tx.send(result);
1494 } else if let Err(ref e) = result
1495 && !matches!(e, CamelError::Stopped)
1496 {
1497 error!("Pipeline error: {e}");
1498 }
1499 });
1500 }
1501 })
1502 }
1503 };
1504 #[cfg(test)]
1505 emit_start_route_event("pipeline_spawned");
1506
1507 let consumer_handle = super::consumer_management::spawn_consumer_task(
1510 route_id.to_string(),
1511 consumer,
1512 consumer_ctx,
1513 crash_notifier,
1514 runtime_for_consumer,
1515 false,
1516 );
1517 #[cfg(test)]
1518 emit_start_route_event("consumer_spawned");
1519
1520 let managed = self
1522 .routes
1523 .get_mut(route_id)
1524 .expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
1526 managed.pipeline_handle = Some(pipeline_handle);
1527 managed.channel_sender = Some(tx_for_storage);
1528
1529 info!(route_id = %route_id, "Route started");
1530 Ok(())
1531 }
1532
1533 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1534 self.stop_route_internal(route_id).await?;
1535 if let Some(reg) = &self.health_registry {
1536 reg.unregister_for_route(route_id);
1537 }
1538 Ok(())
1539 }
1540
1541 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1542 self.stop_route(route_id).await?;
1543 tokio::time::sleep(Duration::from_millis(100)).await;
1544 self.start_route(route_id).await
1545 }
1546
1547 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1548 let managed = self
1550 .routes
1551 .get_mut(route_id)
1552 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1553
1554 let consumer_running = handle_is_running(&managed.consumer_handle);
1555 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1556
1557 if !consumer_running || !pipeline_running {
1559 return Err(CamelError::RouteError(format!(
1560 "Cannot suspend route '{}' with execution lifecycle {}",
1561 route_id,
1562 inferred_lifecycle_label(managed)
1563 )));
1564 }
1565
1566 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1567
1568 let managed = self
1570 .routes
1571 .get_mut(route_id)
1572 .expect("invariant: route must exist after prior existence check"); managed.consumer_cancel_token.cancel();
1574
1575 let managed = self
1577 .routes
1578 .get_mut(route_id)
1579 .expect("invariant: route must exist after prior existence check"); let consumer_handle = managed.consumer_handle.take();
1581
1582 let timeout_result = tokio::time::timeout(DEFAULT_SHUTDOWN_TIMEOUT, async {
1584 if let Some(handle) = consumer_handle {
1585 let _ = handle.await;
1586 }
1587 })
1588 .await;
1589
1590 if timeout_result.is_err() {
1591 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1592 }
1593
1594 let managed = self
1596 .routes
1597 .get_mut(route_id)
1598 .expect("invariant: route must exist after prior existence check"); managed.consumer_cancel_token = CancellationToken::new();
1602
1603 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1604 Ok(())
1605 }
1606
1607 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1608 let managed = self
1610 .routes
1611 .get(route_id)
1612 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1613
1614 let consumer_running = handle_is_running(&managed.consumer_handle);
1615 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1616 if consumer_running || !pipeline_running {
1617 return Err(CamelError::RouteError(format!(
1618 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1619 route_id,
1620 inferred_lifecycle_label(managed)
1621 )));
1622 }
1623
1624 let sender = managed.channel_sender.clone().ok_or_else(|| {
1626 CamelError::RouteError("Suspended route has no channel sender".into())
1627 })?;
1628
1629 let from_uri = managed.from_uri.clone();
1631
1632 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1633
1634 let (mut consumer, _) = super::consumer_management::create_route_consumer(
1635 &self.registry,
1636 &from_uri,
1637 &ControllerComponentContext::new(
1638 Arc::clone(&self.registry),
1639 Arc::clone(&self.languages),
1640 self.tracer_metrics
1641 .clone()
1642 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1643 Arc::clone(&self.platform_service),
1644 self.health_registry(),
1645 Some(route_id.to_string()),
1646 ),
1647 )?;
1648
1649 let managed = self
1651 .routes
1652 .get(route_id)
1653 .expect("invariant: route must exist after prior existence check"); if let (Some(sp_config), Some(authenticator)) = (
1655 managed.security_policy.as_ref(),
1656 managed.security_authenticator.as_ref(),
1657 ) {
1658 use camel_component_api::SecurityContext;
1659 let sec_ctx =
1660 SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
1661 consumer.set_security_context(sec_ctx);
1662 }
1663
1664 let managed = self
1666 .routes
1667 .get_mut(route_id)
1668 .expect("invariant: route must exist after prior existence check"); let consumer_cancel = managed.consumer_cancel_token.child_token();
1672
1673 let crash_notifier = self.crash_notifier.clone();
1674 let runtime_for_consumer = self.runtime.clone();
1675
1676 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1678
1679 let consumer_handle = super::consumer_management::spawn_consumer_task(
1681 route_id.to_string(),
1682 consumer,
1683 consumer_ctx,
1684 crash_notifier,
1685 runtime_for_consumer,
1686 true,
1687 );
1688
1689 let managed = self
1691 .routes
1692 .get_mut(route_id)
1693 .expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
1695
1696 info!(route_id = %route_id, "Route resumed");
1697 Ok(())
1698 }
1699
1700 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1701 let route_ids: Vec<String> = {
1704 let mut pairs: Vec<_> = self
1705 .routes
1706 .iter()
1707 .filter(|(_, r)| r.definition.auto_startup())
1708 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1709 .collect();
1710 pairs.sort_by_key(|(_, order)| *order);
1711 pairs.into_iter().map(|(id, _)| id).collect()
1712 };
1713
1714 info!("Starting {} auto-startup routes", route_ids.len());
1715
1716 let mut errors: Vec<String> = Vec::new();
1718 for route_id in route_ids {
1719 if let Err(e) = self.start_route(&route_id).await {
1720 errors.push(format!("Route '{}': {}", route_id, e));
1721 }
1722 }
1723
1724 if !errors.is_empty() {
1725 return Err(CamelError::RouteError(format!(
1726 "Failed to start routes: {}",
1727 errors.join(", ")
1728 )));
1729 }
1730
1731 info!("All auto-startup routes started");
1732 Ok(())
1733 }
1734
1735 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1736 let route_ids: Vec<String> = {
1738 let mut pairs: Vec<_> = self
1739 .routes
1740 .iter()
1741 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1742 .collect();
1743 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1744 pairs.into_iter().map(|(id, _)| id).collect()
1745 };
1746
1747 info!("Stopping {} routes", route_ids.len());
1748
1749 for route_id in route_ids {
1750 let _ = self.stop_route(&route_id).await;
1751 }
1752
1753 info!("All routes stopped");
1754 Ok(())
1755 }
1756}
1757
1758#[cfg(test)]
1759#[path = "route_controller_tests.rs"]
1760mod tests;