1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21 BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
22 NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeCommand,
23 RuntimeHandle,
24};
25use camel_component_api::{
26 ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
27};
28use camel_endpoint::parse_uri;
29pub use camel_processor::aggregator::SharedLanguageRegistry;
30use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
31use camel_processor::circuit_breaker::CircuitBreakerLayer;
32use camel_processor::error_handler::ErrorHandlerLayer;
33
34use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
35use crate::lifecycle::adapters::route_compiler::{
36 compose_pipeline, compose_traced_pipeline_with_contracts,
37};
38use crate::lifecycle::application::route_definition::{
39 BuilderStep, RouteDefinition, RouteDefinitionInfo,
40};
41use crate::shared::components::domain::Registry;
42use crate::shared::observability::domain::{DetailLevel, TracerConfig};
43use arc_swap::ArcSwap;
44use camel_bean::BeanRegistry;
45
46#[derive(Debug, Clone)]
48pub struct CrashNotification {
49 pub route_id: String,
51 pub error: String,
53}
54
55pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
70unsafe impl Sync for SyncBoxProcessor {}
71
72type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
73
74#[cfg(test)]
75type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
76
77#[cfg(test)]
78static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
79 std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
80
81#[cfg(test)]
82fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
83 *START_ROUTE_EVENT_HOOK
84 .lock()
85 .expect("start route event hook lock") = hook;
86}
87
88#[cfg(test)]
89fn emit_start_route_event(event: &'static str) {
90 if let Some(hook) = START_ROUTE_EVENT_HOOK
91 .lock()
92 .expect("start route event hook lock")
93 .as_ref()
94 {
95 hook(event);
96 }
97}
98
99pub(super) struct AggregateSplitInfo {
101 pub(super) pre_pipeline: SharedPipeline,
102 pub(super) agg_config: AggregatorConfig,
103 pub(super) post_pipeline: SharedPipeline,
104}
105
106pub(super) struct ManagedRoute {
107 pub(super) definition: RouteDefinitionInfo,
109 pub(super) from_uri: String,
111 pub(super) pipeline: SharedPipeline,
113 pub(super) concurrency: Option<ConcurrencyModel>,
115 pub(super) consumer_handle: Option<JoinHandle<()>>,
117 pub(super) pipeline_handle: Option<JoinHandle<()>>,
119 pub(super) consumer_cancel_token: CancellationToken,
122 pub(super) pipeline_cancel_token: CancellationToken,
125 pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
128 pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
130 pub(super) aggregate_split: Option<AggregateSplitInfo>,
131 pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
132}
133
134pub(crate) struct PreparedRoute {
135 pub(crate) route_id: String,
136 pub(super) managed: ManagedRoute,
137}
138
139pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
140 handle.as_ref().is_some_and(|h| !h.is_finished())
141}
142
143fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
144 match (
145 handle_is_running(&managed.consumer_handle),
146 handle_is_running(&managed.pipeline_handle),
147 ) {
148 (true, true) => "Started",
149 (false, true) => "Suspended",
150 (true, false) => "Stopping",
151 (false, false) => "Stopped",
152 }
153}
154
155fn find_top_level_aggregate_with_timeout(
156 steps: &[BuilderStep],
157) -> Option<(usize, AggregatorConfig)> {
158 for (i, step) in steps.iter().enumerate() {
159 if let BuilderStep::Aggregate { config } = step {
160 if has_timeout_condition(&config.completion) {
161 return Some((i, config.clone()));
162 }
163 break;
164 }
165 }
166 None
167}
168
169pub(crate) struct ControllerComponentContext {
170 registry: Arc<std::sync::Mutex<Registry>>,
171 languages: SharedLanguageRegistry,
172 metrics: Arc<dyn MetricsCollector>,
173 platform_service: Arc<dyn PlatformService>,
174}
175
176impl ControllerComponentContext {
177 pub(crate) fn new(
178 registry: Arc<std::sync::Mutex<Registry>>,
179 languages: SharedLanguageRegistry,
180 metrics: Arc<dyn MetricsCollector>,
181 platform_service: Arc<dyn PlatformService>,
182 ) -> Self {
183 Self {
184 registry,
185 languages,
186 metrics,
187 platform_service,
188 }
189 }
190}
191
192impl ComponentContext for ControllerComponentContext {
193 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
194 self.registry.lock().ok()?.get(scheme)
195 }
196
197 fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
198 self.languages.lock().ok()?.get(name).cloned()
199 }
200
201 fn metrics(&self) -> Arc<dyn MetricsCollector> {
202 Arc::clone(&self.metrics)
203 }
204
205 fn platform_service(&self) -> Arc<dyn PlatformService> {
206 Arc::clone(&self.platform_service)
207 }
208}
209
210fn is_pending(ex: &Exchange) -> bool {
211 ex.property("CamelAggregatorPending")
212 .and_then(|v| v.as_bool())
213 .unwrap_or(false)
214}
215
216async fn ready_with_backoff(
223 pipeline: &mut BoxProcessor,
224 cancel: &CancellationToken,
225) -> Result<(), CamelError> {
226 loop {
227 match pipeline.ready().await {
228 Ok(_) => return Ok(()),
229 Err(CamelError::CircuitOpen(ref msg)) => {
230 warn!("Circuit open, backing off: {msg}");
231 tokio::select! {
232 _ = tokio::time::sleep(Duration::from_secs(1)) => {
233 continue;
234 }
235 _ = cancel.cancelled() => {
236 return Err(CamelError::CircuitOpen(msg.clone()));
238 }
239 }
240 }
241 Err(e) => {
242 error!("Pipeline not ready: {e}");
243 return Err(e);
244 }
245 }
246 }
247}
248
249fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
250 let stamp = std::time::SystemTime::now()
251 .duration_since(std::time::UNIX_EPOCH)
252 .unwrap_or_default()
253 .as_nanos();
254 RuntimeCommand::FailRoute {
255 route_id: route_id.to_string(),
256 error: error.to_string(),
257 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
258 causation_id: None,
259 }
260}
261
262pub(super) async fn publish_runtime_failure(
263 runtime: Option<Weak<dyn RuntimeHandle>>,
264 route_id: &str,
265 error: &str,
266) {
267 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
268 return;
269 };
270 let command = runtime_failure_command(route_id, error);
271 if let Err(runtime_error) = runtime.execute(command).await {
272 warn!(
273 route_id = %route_id,
274 error = %runtime_error,
275 "failed to synchronize route crash with runtime projection"
276 );
277 }
278}
279
280pub struct DefaultRouteController {
288 routes: HashMap<String, ManagedRoute>,
290 registry: Arc<std::sync::Mutex<Registry>>,
292 languages: SharedLanguageRegistry,
294 beans: Arc<std::sync::Mutex<BeanRegistry>>,
296 runtime: Option<Weak<dyn RuntimeHandle>>,
298 global_error_handler: Option<ErrorHandlerConfig>,
300 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
302 tracing_enabled: bool,
304 tracer_detail_level: DetailLevel,
306 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
308 platform_service: Arc<dyn PlatformService>,
309 function_invoker: Option<Arc<dyn FunctionInvoker>>,
310}
311
312impl DefaultRouteController {
313 pub fn new(
315 registry: Arc<std::sync::Mutex<Registry>>,
316 platform_service: Arc<dyn PlatformService>,
317 ) -> Self {
318 Self::with_beans_and_platform_service(
319 registry,
320 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
321 platform_service,
322 )
323 }
324
325 pub fn with_beans(
327 registry: Arc<std::sync::Mutex<Registry>>,
328 beans: Arc<std::sync::Mutex<BeanRegistry>>,
329 ) -> Self {
330 Self::with_beans_and_platform_service(
331 registry,
332 beans,
333 Arc::new(NoopPlatformService::default()),
334 )
335 }
336
337 fn with_beans_and_platform_service(
338 registry: Arc<std::sync::Mutex<Registry>>,
339 beans: Arc<std::sync::Mutex<BeanRegistry>>,
340 platform_service: Arc<dyn PlatformService>,
341 ) -> Self {
342 Self {
343 routes: HashMap::new(),
344 registry,
345 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
346 beans,
347 runtime: None,
348 global_error_handler: None,
349 crash_notifier: None,
350 tracing_enabled: false,
351 tracer_detail_level: DetailLevel::Minimal,
352 tracer_metrics: None,
353 platform_service,
354 function_invoker: None,
355 }
356 }
357
358 pub fn with_languages(
360 registry: Arc<std::sync::Mutex<Registry>>,
361 languages: SharedLanguageRegistry,
362 platform_service: Arc<dyn PlatformService>,
363 ) -> Self {
364 Self {
365 routes: HashMap::new(),
366 registry,
367 languages,
368 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
369 runtime: None,
370 global_error_handler: None,
371 crash_notifier: None,
372 tracing_enabled: false,
373 tracer_detail_level: DetailLevel::Minimal,
374 tracer_metrics: None,
375 platform_service,
376 function_invoker: None,
377 }
378 }
379
380 pub fn with_languages_and_beans(
381 registry: Arc<std::sync::Mutex<Registry>>,
382 languages: SharedLanguageRegistry,
383 platform_service: Arc<dyn PlatformService>,
384 beans: Arc<std::sync::Mutex<BeanRegistry>>,
385 ) -> Self {
386 Self {
387 routes: HashMap::new(),
388 registry,
389 languages,
390 beans,
391 runtime: None,
392 global_error_handler: None,
393 crash_notifier: None,
394 tracing_enabled: false,
395 tracer_detail_level: DetailLevel::Minimal,
396 tracer_metrics: None,
397 platform_service,
398 function_invoker: None,
399 }
400 }
401
402 pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
403 self.function_invoker = Some(function_invoker);
404 self
405 }
406
407 pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
408 self.function_invoker = Some(invoker);
409 }
410
411 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
413 self.runtime = Some(Arc::downgrade(&runtime));
414 }
415
416 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
421 self.crash_notifier = Some(tx);
422 }
423
424 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
426 self.global_error_handler = Some(config);
427 }
428
429 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
431 self.tracing_enabled = config.enabled;
432 self.tracer_detail_level = config.detail_level.clone();
433 self.tracer_metrics = config.metrics_collector.clone();
434 }
435
436 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
437 let mut producer_ctx = ProducerContext::new();
438 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
439 producer_ctx = producer_ctx.with_runtime(runtime);
440 }
441 Ok(producer_ctx)
442 }
443
444 fn resolve_error_handler(
446 &self,
447 config: ErrorHandlerConfig,
448 producer_ctx: &ProducerContext,
449 component_ctx: &dyn ComponentContext,
450 ) -> Result<ErrorHandlerLayer, CamelError> {
451 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
453 let parsed = parse_uri(uri)?;
454 let component = component_ctx
455 .resolve_component(&parsed.scheme)
456 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
457 let endpoint = component.create_endpoint(uri, component_ctx)?;
458 Some(endpoint.create_producer(producer_ctx)?)
459 } else {
460 None
461 };
462
463 let mut resolved_policies = Vec::new();
465 for policy in config.policies {
466 let handler_producer = if let Some(ref uri) = policy.handled_by {
467 let parsed = parse_uri(uri)?;
468 let component = component_ctx
469 .resolve_component(&parsed.scheme)
470 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
471 let endpoint = component.create_endpoint(uri, component_ctx)?;
472 Some(endpoint.create_producer(producer_ctx)?)
473 } else {
474 None
475 };
476 resolved_policies.push((policy, handler_producer));
477 }
478
479 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
480 }
481
482 fn resolve_uow_layer(
485 &self,
486 config: &UnitOfWorkConfig,
487 producer_ctx: &ProducerContext,
488 component_ctx: &dyn ComponentContext,
489 counter: Option<Arc<AtomicU64>>,
490 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
491 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
492 let parsed = parse_uri(uri)?;
493 let component = component_ctx
494 .resolve_component(&parsed.scheme)
495 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
496 let endpoint = component.create_endpoint(uri, component_ctx)?;
497 endpoint.create_producer(producer_ctx).map_err(|e| {
498 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
499 })
500 };
501
502 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
503 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
504
505 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
506 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
507 Ok((layer, counter))
508 }
509
510 pub(crate) fn resolve_steps(
512 &self,
513 steps: Vec<BuilderStep>,
514 producer_ctx: &ProducerContext,
515 registry: &Arc<std::sync::Mutex<Registry>>,
516 route_id: Option<&str>,
517 staging_mode: &super::step_resolution::FunctionStagingMode,
518 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
519 let component_ctx = Arc::new(ControllerComponentContext::new(
520 Arc::clone(registry),
521 Arc::clone(&self.languages),
522 self.tracer_metrics
523 .clone()
524 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
525 Arc::clone(&self.platform_service),
526 ));
527
528 super::step_resolution::resolve_steps(
529 steps,
530 producer_ctx,
531 registry,
532 &self.languages,
533 &self.beans,
534 self.function_invoker.clone(),
535 component_ctx,
536 route_id,
537 staging_mode,
538 )
539 }
540
541 pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
551 let route_id = definition.route_id().to_string();
552
553 if self.routes.contains_key(&route_id) {
554 return Err(CamelError::RouteError(format!(
555 "Route '{}' already exists",
556 route_id
557 )));
558 }
559
560 info!(route_id = %route_id, "Adding route to controller");
561
562 let prepared = match self.build_managed_route(
563 definition,
564 &super::step_resolution::FunctionStagingMode::DirectAdd,
565 ) {
566 Ok(prepared) => prepared,
567 Err(err) => {
568 self.discard_function_staging();
569 return Err(err);
570 }
571 };
572
573 if let Some(invoker) = &self.function_invoker
574 && let Err(err) = invoker.commit_staged().await
575 {
576 invoker.discard_staging(0);
577 return Err(CamelError::Config(err.to_string()));
578 }
579
580 self.routes
581 .insert(prepared.route_id.clone(), prepared.managed);
582
583 Ok(())
584 }
585
586 fn build_managed_route(
587 &self,
588 definition: RouteDefinition,
589 staging_mode: &super::step_resolution::FunctionStagingMode,
590 ) -> Result<PreparedRoute, CamelError> {
591 let route_id = definition.route_id().to_string();
592
593 let definition_info = definition.to_info();
594 let RouteDefinition {
595 from_uri,
596 steps,
597 error_handler,
598 circuit_breaker,
599 unit_of_work,
600 concurrency,
601 ..
602 } = definition;
603
604 let producer_ctx = self.build_producer_context()?;
605
606 let mut aggregate_split: Option<AggregateSplitInfo> = None;
607 let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
608 Some((idx, agg_config)) => {
609 let mut pre_steps = steps;
610 let mut rest = pre_steps.split_off(idx);
611 let _agg_step = rest.remove(0);
612 let post_steps = rest;
613
614 let pre_pairs = self.resolve_steps(
615 pre_steps,
616 &producer_ctx,
617 &self.registry,
618 Some(&route_id),
619 staging_mode,
620 )?;
621 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
622 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
623 compose_pipeline(pre_procs),
624 )));
625
626 let post_pairs = self.resolve_steps(
627 post_steps,
628 &producer_ctx,
629 &self.registry,
630 Some(&route_id),
631 staging_mode,
632 )?;
633 let post_procs: Vec<BoxProcessor> =
634 post_pairs.into_iter().map(|(p, _)| p).collect();
635 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
636 compose_pipeline(post_procs),
637 )));
638
639 aggregate_split = Some(AggregateSplitInfo {
640 pre_pipeline,
641 agg_config,
642 post_pipeline,
643 });
644
645 vec![]
646 }
647 None => self.resolve_steps(
648 steps,
649 &producer_ctx,
650 &self.registry,
651 Some(&route_id),
652 staging_mode,
653 )?,
654 };
655 let route_id_for_tracing = route_id.clone();
656 let mut pipeline = if processors_with_contracts.is_empty() {
657 BoxProcessor::new(IdentityProcessor)
658 } else {
659 compose_traced_pipeline_with_contracts(
660 processors_with_contracts,
661 &route_id_for_tracing,
662 self.tracing_enabled,
663 self.tracer_detail_level.clone(),
664 self.tracer_metrics.clone(),
665 )
666 };
667
668 if let Some(cb_config) = circuit_breaker {
669 let cb_layer = CircuitBreakerLayer::new(cb_config);
670 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
671 }
672
673 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
674
675 if let Some(config) = eh_config {
676 let component_ctx = ControllerComponentContext::new(
677 Arc::clone(&self.registry),
678 Arc::clone(&self.languages),
679 self.tracer_metrics
680 .clone()
681 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
682 Arc::clone(&self.platform_service),
683 );
684 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
685 pipeline = BoxProcessor::new(layer.layer(pipeline));
686 }
687
688 let uow_counter = if let Some(uow_config) = &unit_of_work {
689 let component_ctx = ControllerComponentContext::new(
690 Arc::clone(&self.registry),
691 Arc::clone(&self.languages),
692 self.tracer_metrics
693 .clone()
694 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
695 Arc::clone(&self.platform_service),
696 );
697 let (uow_layer, counter) =
698 self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
699 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
700 Some(counter)
701 } else {
702 None
703 };
704
705 Ok(PreparedRoute {
706 route_id,
707 managed: ManagedRoute {
708 definition: definition_info,
709 from_uri,
710 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
711 concurrency,
712 consumer_handle: None,
713 pipeline_handle: None,
714 consumer_cancel_token: CancellationToken::new(),
715 pipeline_cancel_token: CancellationToken::new(),
716 channel_sender: None,
717 in_flight: uow_counter,
718 aggregate_split,
719 agg_service: None,
720 },
721 })
722 }
723
724 pub(crate) fn insert_prepared_route(
725 &mut self,
726 prepared: PreparedRoute,
727 ) -> Result<(), CamelError> {
728 if self.routes.contains_key(&prepared.route_id) {
729 return Err(CamelError::RouteError(format!(
730 "Route '{}' already exists",
731 prepared.route_id
732 )));
733 }
734 self.routes
735 .insert(prepared.route_id.clone(), prepared.managed);
736 Ok(())
737 }
738
739 pub async fn add_route_with_generation(
740 &mut self,
741 definition: RouteDefinition,
742 generation: u64,
743 ) -> Result<(), CamelError> {
744 let route_id = definition.route_id().to_string();
745
746 if self.routes.contains_key(&route_id) {
747 return Err(CamelError::RouteError(format!(
748 "Route '{}' already exists",
749 route_id
750 )));
751 }
752
753 info!(route_id = %route_id, generation, "Adding route to controller with generation");
754
755 let prepared = self.build_managed_route(
756 definition,
757 &super::step_resolution::FunctionStagingMode::HotReload { generation },
758 )?;
759
760 self.routes
761 .insert(prepared.route_id.clone(), prepared.managed);
762
763 Ok(())
764 }
765
766 pub(crate) fn prepare_route_definition_with_generation(
767 &self,
768 definition: RouteDefinition,
769 generation: u64,
770 ) -> Result<PreparedRoute, CamelError> {
771 self.build_managed_route(
772 definition,
773 &super::step_resolution::FunctionStagingMode::HotReload { generation },
774 )
775 }
776
777 pub async fn remove_route_preserving_functions(
778 &mut self,
779 route_id: &str,
780 ) -> Result<(), CamelError> {
781 let managed = self.routes.get(route_id).ok_or_else(|| {
782 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
783 })?;
784 if handle_is_running(&managed.consumer_handle)
785 || handle_is_running(&managed.pipeline_handle)
786 {
787 return Err(CamelError::RouteError(format!(
788 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
789 route_id,
790 inferred_lifecycle_label(managed)
791 )));
792 }
793 self.routes.remove(route_id);
794 info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
795 Ok(())
796 }
797
798 pub fn compile_route_definition(
799 &self,
800 def: RouteDefinition,
801 ) -> Result<BoxProcessor, CamelError> {
802 let route_id = def.route_id().to_string();
803
804 let producer_ctx = self.build_producer_context()?;
805
806 let processors_with_contracts = self.resolve_steps(
807 def.steps,
808 &producer_ctx,
809 &self.registry,
810 Some(&route_id),
811 &super::step_resolution::FunctionStagingMode::DryCompile,
812 )?;
813 let mut pipeline = compose_traced_pipeline_with_contracts(
814 processors_with_contracts,
815 &route_id,
816 self.tracing_enabled,
817 self.tracer_detail_level.clone(),
818 self.tracer_metrics.clone(),
819 );
820
821 if let Some(cb_config) = def.circuit_breaker {
822 let cb_layer = CircuitBreakerLayer::new(cb_config);
823 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
824 }
825
826 let eh_config = def
827 .error_handler
828 .clone()
829 .or_else(|| self.global_error_handler.clone());
830 if let Some(config) = eh_config {
831 let component_ctx = ControllerComponentContext::new(
832 Arc::clone(&self.registry),
833 Arc::clone(&self.languages),
834 self.tracer_metrics
835 .clone()
836 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
837 Arc::clone(&self.platform_service),
838 );
839 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
840 pipeline = BoxProcessor::new(layer.layer(pipeline));
841 }
842
843 if let Some(uow_config) = &def.unit_of_work {
845 let existing_counter = self
846 .routes
847 .get(&route_id)
848 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
849
850 let component_ctx = ControllerComponentContext::new(
851 Arc::clone(&self.registry),
852 Arc::clone(&self.languages),
853 self.tracer_metrics
854 .clone()
855 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
856 Arc::clone(&self.platform_service),
857 );
858
859 let (uow_layer, _counter) = self.resolve_uow_layer(
860 uow_config,
861 &producer_ctx,
862 &component_ctx,
863 existing_counter,
864 )?;
865
866 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
867 }
868
869 Ok(pipeline)
870 }
871
872 pub fn compile_route_definition_with_generation(
873 &self,
874 def: RouteDefinition,
875 generation: u64,
876 ) -> Result<BoxProcessor, CamelError> {
877 let route_id = def.route_id().to_string();
878
879 let producer_ctx = self.build_producer_context()?;
880
881 let processors_with_contracts = self.resolve_steps(
882 def.steps,
883 &producer_ctx,
884 &self.registry,
885 Some(&route_id),
886 &super::step_resolution::FunctionStagingMode::HotReload { generation },
887 )?;
888 let mut pipeline = compose_traced_pipeline_with_contracts(
889 processors_with_contracts,
890 &route_id,
891 self.tracing_enabled,
892 self.tracer_detail_level.clone(),
893 self.tracer_metrics.clone(),
894 );
895
896 if let Some(cb_config) = def.circuit_breaker {
897 let cb_layer = CircuitBreakerLayer::new(cb_config);
898 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
899 }
900
901 let eh_config = def
902 .error_handler
903 .clone()
904 .or_else(|| self.global_error_handler.clone());
905 if let Some(config) = eh_config {
906 let component_ctx = ControllerComponentContext::new(
907 Arc::clone(&self.registry),
908 Arc::clone(&self.languages),
909 self.tracer_metrics
910 .clone()
911 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
912 Arc::clone(&self.platform_service),
913 );
914 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
915 pipeline = BoxProcessor::new(layer.layer(pipeline));
916 }
917
918 if let Some(uow_config) = &def.unit_of_work {
919 let existing_counter = self
920 .routes
921 .get(&route_id)
922 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
923
924 let component_ctx = ControllerComponentContext::new(
925 Arc::clone(&self.registry),
926 Arc::clone(&self.languages),
927 self.tracer_metrics
928 .clone()
929 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
930 Arc::clone(&self.platform_service),
931 );
932
933 let (uow_layer, _counter) = self.resolve_uow_layer(
934 uow_config,
935 &producer_ctx,
936 &component_ctx,
937 existing_counter,
938 )?;
939
940 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
941 }
942
943 Ok(pipeline)
944 }
945
946 pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
952 let managed = self.routes.get(route_id).ok_or_else(|| {
953 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
954 })?;
955 if handle_is_running(&managed.consumer_handle)
956 || handle_is_running(&managed.pipeline_handle)
957 {
958 return Err(CamelError::RouteError(format!(
959 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
960 route_id,
961 inferred_lifecycle_label(managed)
962 )));
963 }
964 if let Some(invoker) = &self.function_invoker {
965 for (id, rid) in self.collect_function_refs(route_id) {
966 if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
967 warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
968 }
969 }
970 }
971 self.routes.remove(route_id);
972 info!(route_id = %route_id, "Route removed from controller");
973 Ok(())
974 }
975
976 fn collect_function_refs(
977 &self,
978 route_id: &str,
979 ) -> Vec<(camel_api::FunctionId, Option<String>)> {
980 self.function_invoker
981 .as_ref()
982 .map(|invoker| invoker.function_refs_for_route(route_id))
983 .unwrap_or_default()
984 }
985
986 fn discard_function_staging(&self) {
987 if let Some(invoker) = &self.function_invoker {
988 invoker.discard_staging(0);
989 }
990 }
991
992 pub fn route_count(&self) -> usize {
994 self.routes.len()
995 }
996
997 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
998 self.routes.get(route_id).map(|r| {
999 r.in_flight
1000 .as_ref()
1001 .map_or(0, |c| c.load(Ordering::Relaxed))
1002 })
1003 }
1004
1005 pub fn route_exists(&self, route_id: &str) -> bool {
1007 self.routes.contains_key(route_id)
1008 }
1009
1010 pub fn route_ids(&self) -> Vec<String> {
1012 self.routes.keys().cloned().collect()
1013 }
1014
1015 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
1016 self.routes
1017 .get(route_id)
1018 .and_then(|m| m.definition.source_hash())
1019 }
1020
1021 pub fn auto_startup_route_ids(&self) -> Vec<String> {
1023 let mut pairs: Vec<(String, i32)> = self
1024 .routes
1025 .iter()
1026 .filter(|(_, managed)| managed.definition.auto_startup())
1027 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1028 .collect();
1029 pairs.sort_by_key(|(_, order)| *order);
1030 pairs.into_iter().map(|(id, _)| id).collect()
1031 }
1032
1033 pub fn shutdown_route_ids(&self) -> Vec<String> {
1035 let mut pairs: Vec<(String, i32)> = self
1036 .routes
1037 .iter()
1038 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1039 .collect();
1040 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1041 pairs.into_iter().map(|(id, _)| id).collect()
1042 }
1043
1044 pub fn swap_pipeline(
1049 &self,
1050 route_id: &str,
1051 new_pipeline: BoxProcessor,
1052 ) -> Result<(), CamelError> {
1053 let managed = self
1054 .routes
1055 .get(route_id)
1056 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1057
1058 if managed.aggregate_split.is_some() {
1059 tracing::warn!(
1060 route_id = %route_id,
1061 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1062 );
1063 }
1064
1065 managed
1066 .pipeline
1067 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1068 info!(route_id = %route_id, "Pipeline swapped atomically");
1069 Ok(())
1070 }
1071
1072 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1074 self.routes.get(route_id).map(|r| r.from_uri.clone())
1075 }
1076
1077 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1082 self.routes
1083 .get(route_id)
1084 .map(|r| r.pipeline.load().0.clone())
1085 }
1086
1087 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1089 super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
1090 }
1091
1092 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1093 self.start_route(route_id).await
1094 }
1095
1096 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1097 self.stop_route(route_id).await
1098 }
1099}
1100
1101#[async_trait::async_trait]
1102impl RouteController for DefaultRouteController {
1103 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1104 {
1106 let managed = self
1107 .routes
1108 .get_mut(route_id)
1109 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1110
1111 let consumer_running = handle_is_running(&managed.consumer_handle);
1112 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1113 if consumer_running && pipeline_running {
1114 return Ok(());
1115 }
1116 if !consumer_running && pipeline_running {
1117 return Err(CamelError::RouteError(format!(
1118 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1119 route_id
1120 )));
1121 }
1122 if consumer_running && !pipeline_running {
1123 return Err(CamelError::RouteError(format!(
1124 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1125 route_id
1126 )));
1127 }
1128 }
1129
1130 info!(route_id = %route_id, "Starting route");
1131
1132 let (from_uri, pipeline, concurrency) = {
1134 let managed = self
1135 .routes
1136 .get(route_id)
1137 .expect("invariant: route must exist after prior existence check");
1138 (
1139 managed.from_uri.clone(),
1140 Arc::clone(&managed.pipeline),
1141 managed.concurrency.clone(),
1142 )
1143 };
1144
1145 let crash_notifier = self.crash_notifier.clone();
1147 let runtime_for_consumer = self.runtime.clone();
1148
1149 let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
1150 &self.registry,
1151 &from_uri,
1152 &ControllerComponentContext::new(
1153 Arc::clone(&self.registry),
1154 Arc::clone(&self.languages),
1155 self.tracer_metrics
1156 .clone()
1157 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1158 Arc::clone(&self.platform_service),
1159 ),
1160 )?;
1161
1162 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1164
1165 let managed = self
1167 .routes
1168 .get_mut(route_id)
1169 .expect("invariant: route must exist after prior existence check");
1170
1171 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1173 let consumer_cancel = managed.consumer_cancel_token.child_token();
1175 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1176 let tx_for_storage = tx.clone();
1178 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1179
1180 let managed = self
1182 .routes
1183 .get_mut(route_id)
1184 .expect("invariant: route must exist after prior existence check");
1185
1186 if let Some(split) = managed.aggregate_split.as_ref() {
1187 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1188
1189 let route_cancel_clone = pipeline_cancel.clone();
1190 let svc = AggregatorService::new(
1191 split.agg_config.clone(),
1192 late_tx,
1193 Arc::clone(&self.languages),
1194 route_cancel_clone,
1195 );
1196 let agg = Arc::new(std::sync::Mutex::new(svc));
1197
1198 managed.agg_service = Some(Arc::clone(&agg));
1199
1200 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1201 let pre_pipeline = Arc::clone(&split.pre_pipeline);
1202 let post_pipeline = Arc::clone(&split.post_pipeline);
1203
1204 let pipeline_handle = tokio::spawn(async move {
1206 loop {
1207 tokio::select! {
1208 biased;
1209
1210 late_ex = async {
1211 let mut rx = late_rx.lock().await;
1212 rx.recv().await
1213 } => {
1214 match late_ex {
1215 Some(ex) => {
1216 let pipe = post_pipeline.load();
1217 if let Err(e) = pipe.0.clone().oneshot(ex).await {
1218 tracing::warn!(error = %e, "late exchange post-pipeline failed");
1219 }
1220 }
1221 None => return,
1222 }
1223 }
1224
1225 envelope_opt = rx.recv() => {
1226 match envelope_opt {
1227 Some(envelope) => {
1228 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1229 let pre_pipe = pre_pipeline.load();
1230 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1231 Ok(ex) => ex,
1232 Err(e) => {
1233 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1234 continue;
1235 }
1236 };
1237
1238 let ex = {
1239 let cloned_svc = agg
1240 .lock()
1241 .expect("mutex poisoned: another thread panicked while holding this lock")
1242 .clone();
1243 cloned_svc.oneshot(ex).await
1244 };
1245
1246 match ex {
1247 Ok(ex) => {
1248 if !is_pending(&ex) {
1249 let post_pipe = post_pipeline.load();
1250 let out = post_pipe.0.clone().oneshot(ex).await;
1251 if let Some(tx) = reply_tx { let _ = tx.send(out); }
1252 } else if let Some(tx) = reply_tx {
1253 let _ = tx.send(Ok(ex));
1254 }
1255 }
1256 Err(e) => {
1257 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1258 }
1259 }
1260 }
1261 None => return,
1262 }
1263 }
1264
1265 _ = pipeline_cancel.cancelled() => {
1266 {
1267 let guard = agg
1268 .lock()
1269 .expect("mutex poisoned: another thread panicked while holding this lock");
1270 guard.force_complete_all();
1271 }
1272 let mut rx_guard = late_rx.lock().await;
1273 while let Ok(late_ex) = rx_guard.try_recv() {
1274 let pipe = post_pipeline.load();
1275 let _ = pipe.0.clone().oneshot(late_ex).await;
1276 }
1277 break;
1278 }
1279 }
1280 }
1281 });
1282 #[cfg(test)]
1283 emit_start_route_event("pipeline_spawned");
1284
1285 let consumer_handle = super::consumer_management::spawn_consumer_task(
1288 route_id.to_string(),
1289 consumer,
1290 consumer_ctx,
1291 crash_notifier,
1292 runtime_for_consumer,
1293 false,
1294 );
1295 #[cfg(test)]
1296 emit_start_route_event("consumer_spawned");
1297
1298 let managed = self
1299 .routes
1300 .get_mut(route_id)
1301 .expect("invariant: route must exist");
1302 managed.consumer_handle = Some(consumer_handle);
1303 managed.pipeline_handle = Some(pipeline_handle);
1304 managed.channel_sender = Some(tx_for_storage);
1305
1306 info!(route_id = %route_id, "Route started (aggregate with timeout)");
1307 return Ok(());
1308 }
1309 let pipeline_handle = match effective_concurrency {
1313 ConcurrencyModel::Sequential => {
1314 tokio::spawn(async move {
1315 loop {
1316 let envelope = tokio::select! {
1318 envelope = rx.recv() => match envelope {
1319 Some(e) => e,
1320 None => return, },
1322 _ = pipeline_cancel.cancelled() => {
1323 return;
1325 }
1326 };
1327 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1328
1329 let mut pipeline = pipeline.load().0.clone();
1331
1332 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1333 if let Some(tx) = reply_tx {
1334 let _ = tx.send(Err(e));
1335 }
1336 return;
1337 }
1338
1339 let result = pipeline.call(exchange).await;
1340 if let Some(tx) = reply_tx {
1341 let _ = tx.send(result);
1342 } else if let Err(ref e) = result
1343 && !matches!(e, CamelError::Stopped)
1344 {
1345 error!("Pipeline error: {e}");
1346 }
1347 }
1348 })
1349 }
1350 ConcurrencyModel::Concurrent { max } => {
1351 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1352 tokio::spawn(async move {
1353 loop {
1354 let envelope = tokio::select! {
1356 envelope = rx.recv() => match envelope {
1357 Some(e) => e,
1358 None => return, },
1360 _ = pipeline_cancel.cancelled() => {
1361 return;
1363 }
1364 };
1365 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1366 let pipe_ref = Arc::clone(&pipeline);
1367 let sem = sem.clone();
1368 let cancel = pipeline_cancel.clone();
1369 tokio::spawn(async move {
1370 let _permit = match &sem {
1372 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1373 None => None,
1374 };
1375
1376 let mut pipe = pipe_ref.load().0.clone();
1378
1379 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1381 if let Some(tx) = reply_tx {
1382 let _ = tx.send(Err(e));
1383 }
1384 return;
1385 }
1386
1387 let result = pipe.call(exchange).await;
1388 if let Some(tx) = reply_tx {
1389 let _ = tx.send(result);
1390 } else if let Err(ref e) = result
1391 && !matches!(e, CamelError::Stopped)
1392 {
1393 error!("Pipeline error: {e}");
1394 }
1395 });
1396 }
1397 })
1398 }
1399 };
1400 #[cfg(test)]
1401 emit_start_route_event("pipeline_spawned");
1402
1403 let consumer_handle = super::consumer_management::spawn_consumer_task(
1406 route_id.to_string(),
1407 consumer,
1408 consumer_ctx,
1409 crash_notifier,
1410 runtime_for_consumer,
1411 false,
1412 );
1413 #[cfg(test)]
1414 emit_start_route_event("consumer_spawned");
1415
1416 let managed = self
1418 .routes
1419 .get_mut(route_id)
1420 .expect("invariant: route must exist after prior existence check");
1421 managed.consumer_handle = Some(consumer_handle);
1422 managed.pipeline_handle = Some(pipeline_handle);
1423 managed.channel_sender = Some(tx_for_storage);
1424
1425 info!(route_id = %route_id, "Route started");
1426 Ok(())
1427 }
1428
1429 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1430 self.stop_route_internal(route_id).await
1431 }
1432
1433 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1434 self.stop_route(route_id).await?;
1435 tokio::time::sleep(Duration::from_millis(100)).await;
1436 self.start_route(route_id).await
1437 }
1438
1439 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1440 let managed = self
1442 .routes
1443 .get_mut(route_id)
1444 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1445
1446 let consumer_running = handle_is_running(&managed.consumer_handle);
1447 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1448
1449 if !consumer_running || !pipeline_running {
1451 return Err(CamelError::RouteError(format!(
1452 "Cannot suspend route '{}' with execution lifecycle {}",
1453 route_id,
1454 inferred_lifecycle_label(managed)
1455 )));
1456 }
1457
1458 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1459
1460 let managed = self
1462 .routes
1463 .get_mut(route_id)
1464 .expect("invariant: route must exist after prior existence check");
1465 managed.consumer_cancel_token.cancel();
1466
1467 let managed = self
1469 .routes
1470 .get_mut(route_id)
1471 .expect("invariant: route must exist after prior existence check");
1472 let consumer_handle = managed.consumer_handle.take();
1473
1474 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1476 if let Some(handle) = consumer_handle {
1477 let _ = handle.await;
1478 }
1479 })
1480 .await;
1481
1482 if timeout_result.is_err() {
1483 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1484 }
1485
1486 let managed = self
1488 .routes
1489 .get_mut(route_id)
1490 .expect("invariant: route must exist after prior existence check");
1491
1492 managed.consumer_cancel_token = CancellationToken::new();
1494
1495 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1496 Ok(())
1497 }
1498
1499 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1500 let managed = self
1502 .routes
1503 .get(route_id)
1504 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1505
1506 let consumer_running = handle_is_running(&managed.consumer_handle);
1507 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1508 if consumer_running || !pipeline_running {
1509 return Err(CamelError::RouteError(format!(
1510 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1511 route_id,
1512 inferred_lifecycle_label(managed)
1513 )));
1514 }
1515
1516 let sender = managed.channel_sender.clone().ok_or_else(|| {
1518 CamelError::RouteError("Suspended route has no channel sender".into())
1519 })?;
1520
1521 let from_uri = managed.from_uri.clone();
1523
1524 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1525
1526 let (consumer, _) = super::consumer_management::create_route_consumer(
1527 &self.registry,
1528 &from_uri,
1529 &ControllerComponentContext::new(
1530 Arc::clone(&self.registry),
1531 Arc::clone(&self.languages),
1532 self.tracer_metrics
1533 .clone()
1534 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1535 Arc::clone(&self.platform_service),
1536 ),
1537 )?;
1538
1539 let managed = self
1541 .routes
1542 .get_mut(route_id)
1543 .expect("invariant: route must exist after prior existence check");
1544
1545 let consumer_cancel = managed.consumer_cancel_token.child_token();
1547
1548 let crash_notifier = self.crash_notifier.clone();
1549 let runtime_for_consumer = self.runtime.clone();
1550
1551 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1553
1554 let consumer_handle = super::consumer_management::spawn_consumer_task(
1556 route_id.to_string(),
1557 consumer,
1558 consumer_ctx,
1559 crash_notifier,
1560 runtime_for_consumer,
1561 true,
1562 );
1563
1564 let managed = self
1566 .routes
1567 .get_mut(route_id)
1568 .expect("invariant: route must exist after prior existence check");
1569 managed.consumer_handle = Some(consumer_handle);
1570
1571 info!(route_id = %route_id, "Route resumed");
1572 Ok(())
1573 }
1574
1575 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1576 let route_ids: Vec<String> = {
1579 let mut pairs: Vec<_> = self
1580 .routes
1581 .iter()
1582 .filter(|(_, r)| r.definition.auto_startup())
1583 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1584 .collect();
1585 pairs.sort_by_key(|(_, order)| *order);
1586 pairs.into_iter().map(|(id, _)| id).collect()
1587 };
1588
1589 info!("Starting {} auto-startup routes", route_ids.len());
1590
1591 let mut errors: Vec<String> = Vec::new();
1593 for route_id in route_ids {
1594 if let Err(e) = self.start_route(&route_id).await {
1595 errors.push(format!("Route '{}': {}", route_id, e));
1596 }
1597 }
1598
1599 if !errors.is_empty() {
1600 return Err(CamelError::RouteError(format!(
1601 "Failed to start routes: {}",
1602 errors.join(", ")
1603 )));
1604 }
1605
1606 info!("All auto-startup routes started");
1607 Ok(())
1608 }
1609
1610 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1611 let route_ids: Vec<String> = {
1613 let mut pairs: Vec<_> = self
1614 .routes
1615 .iter()
1616 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1617 .collect();
1618 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1619 pairs.into_iter().map(|(id, _)| id).collect()
1620 };
1621
1622 info!("Stopping {} routes", route_ids.len());
1623
1624 for route_id in route_ids {
1625 let _ = self.stop_route(&route_id).await;
1626 }
1627
1628 info!("All routes stopped");
1629 Ok(())
1630 }
1631}
1632
1633#[cfg(test)]
1634#[path = "route_controller_tests.rs"]
1635mod tests;