1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21 BoxProcessor, CamelError, Exchange, IdentityProcessor, NoOpMetrics, NoopPlatformService,
22 PlatformService, ProducerContext, RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25 ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35 compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38 BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45#[derive(Debug, Clone)]
47pub struct CrashNotification {
48 pub route_id: String,
50 pub error: String,
52}
53
54pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73#[cfg(test)]
74type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
75
76#[cfg(test)]
77static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
78 std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
79
80#[cfg(test)]
81fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
82 *START_ROUTE_EVENT_HOOK
83 .lock()
84 .expect("start route event hook lock") = hook;
85}
86
87#[cfg(test)]
88fn emit_start_route_event(event: &'static str) {
89 if let Some(hook) = START_ROUTE_EVENT_HOOK
90 .lock()
91 .expect("start route event hook lock")
92 .as_ref()
93 {
94 hook(event);
95 }
96}
97
98pub(super) struct AggregateSplitInfo {
100 pub(super) pre_pipeline: SharedPipeline,
101 pub(super) agg_config: AggregatorConfig,
102 pub(super) post_pipeline: SharedPipeline,
103}
104
105pub(super) struct ManagedRoute {
106 pub(super) definition: RouteDefinitionInfo,
108 pub(super) from_uri: String,
110 pub(super) pipeline: SharedPipeline,
112 pub(super) concurrency: Option<ConcurrencyModel>,
114 pub(super) consumer_handle: Option<JoinHandle<()>>,
116 pub(super) pipeline_handle: Option<JoinHandle<()>>,
118 pub(super) consumer_cancel_token: CancellationToken,
121 pub(super) pipeline_cancel_token: CancellationToken,
124 pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
127 pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
129 pub(super) aggregate_split: Option<AggregateSplitInfo>,
130 pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
131}
132
133pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
134 handle.as_ref().is_some_and(|h| !h.is_finished())
135}
136
137fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
138 match (
139 handle_is_running(&managed.consumer_handle),
140 handle_is_running(&managed.pipeline_handle),
141 ) {
142 (true, true) => "Started",
143 (false, true) => "Suspended",
144 (true, false) => "Stopping",
145 (false, false) => "Stopped",
146 }
147}
148
149fn find_top_level_aggregate_with_timeout(
150 steps: &[BuilderStep],
151) -> Option<(usize, AggregatorConfig)> {
152 for (i, step) in steps.iter().enumerate() {
153 if let BuilderStep::Aggregate { config } = step {
154 if has_timeout_condition(&config.completion) {
155 return Some((i, config.clone()));
156 }
157 break;
158 }
159 }
160 None
161}
162
163pub(crate) struct ControllerComponentContext {
164 registry: Arc<std::sync::Mutex<Registry>>,
165 languages: SharedLanguageRegistry,
166 metrics: Arc<dyn MetricsCollector>,
167 platform_service: Arc<dyn PlatformService>,
168}
169
170impl ControllerComponentContext {
171 pub(crate) fn new(
172 registry: Arc<std::sync::Mutex<Registry>>,
173 languages: SharedLanguageRegistry,
174 metrics: Arc<dyn MetricsCollector>,
175 platform_service: Arc<dyn PlatformService>,
176 ) -> Self {
177 Self {
178 registry,
179 languages,
180 metrics,
181 platform_service,
182 }
183 }
184}
185
186impl ComponentContext for ControllerComponentContext {
187 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
188 self.registry.lock().ok()?.get(scheme)
189 }
190
191 fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
192 self.languages.lock().ok()?.get(name).cloned()
193 }
194
195 fn metrics(&self) -> Arc<dyn MetricsCollector> {
196 Arc::clone(&self.metrics)
197 }
198
199 fn platform_service(&self) -> Arc<dyn PlatformService> {
200 Arc::clone(&self.platform_service)
201 }
202}
203
204fn is_pending(ex: &Exchange) -> bool {
205 ex.property("CamelAggregatorPending")
206 .and_then(|v| v.as_bool())
207 .unwrap_or(false)
208}
209
210async fn ready_with_backoff(
217 pipeline: &mut BoxProcessor,
218 cancel: &CancellationToken,
219) -> Result<(), CamelError> {
220 loop {
221 match pipeline.ready().await {
222 Ok(_) => return Ok(()),
223 Err(CamelError::CircuitOpen(ref msg)) => {
224 warn!("Circuit open, backing off: {msg}");
225 tokio::select! {
226 _ = tokio::time::sleep(Duration::from_secs(1)) => {
227 continue;
228 }
229 _ = cancel.cancelled() => {
230 return Err(CamelError::CircuitOpen(msg.clone()));
232 }
233 }
234 }
235 Err(e) => {
236 error!("Pipeline not ready: {e}");
237 return Err(e);
238 }
239 }
240 }
241}
242
243fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
244 let stamp = std::time::SystemTime::now()
245 .duration_since(std::time::UNIX_EPOCH)
246 .unwrap_or_default()
247 .as_nanos();
248 RuntimeCommand::FailRoute {
249 route_id: route_id.to_string(),
250 error: error.to_string(),
251 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
252 causation_id: None,
253 }
254}
255
256pub(super) async fn publish_runtime_failure(
257 runtime: Option<Weak<dyn RuntimeHandle>>,
258 route_id: &str,
259 error: &str,
260) {
261 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
262 return;
263 };
264 let command = runtime_failure_command(route_id, error);
265 if let Err(runtime_error) = runtime.execute(command).await {
266 warn!(
267 route_id = %route_id,
268 error = %runtime_error,
269 "failed to synchronize route crash with runtime projection"
270 );
271 }
272}
273
274pub struct DefaultRouteController {
282 routes: HashMap<String, ManagedRoute>,
284 registry: Arc<std::sync::Mutex<Registry>>,
286 languages: SharedLanguageRegistry,
288 beans: Arc<std::sync::Mutex<BeanRegistry>>,
290 runtime: Option<Weak<dyn RuntimeHandle>>,
292 global_error_handler: Option<ErrorHandlerConfig>,
294 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
296 tracing_enabled: bool,
298 tracer_detail_level: DetailLevel,
300 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
302 platform_service: Arc<dyn PlatformService>,
303}
304
305impl DefaultRouteController {
306 pub fn new(
308 registry: Arc<std::sync::Mutex<Registry>>,
309 platform_service: Arc<dyn PlatformService>,
310 ) -> Self {
311 Self::with_beans_and_platform_service(
312 registry,
313 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
314 platform_service,
315 )
316 }
317
318 pub fn with_beans(
320 registry: Arc<std::sync::Mutex<Registry>>,
321 beans: Arc<std::sync::Mutex<BeanRegistry>>,
322 ) -> Self {
323 Self::with_beans_and_platform_service(
324 registry,
325 beans,
326 Arc::new(NoopPlatformService::default()),
327 )
328 }
329
330 fn with_beans_and_platform_service(
331 registry: Arc<std::sync::Mutex<Registry>>,
332 beans: Arc<std::sync::Mutex<BeanRegistry>>,
333 platform_service: Arc<dyn PlatformService>,
334 ) -> Self {
335 Self {
336 routes: HashMap::new(),
337 registry,
338 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
339 beans,
340 runtime: None,
341 global_error_handler: None,
342 crash_notifier: None,
343 tracing_enabled: false,
344 tracer_detail_level: DetailLevel::Minimal,
345 tracer_metrics: None,
346 platform_service,
347 }
348 }
349
350 pub fn with_languages(
352 registry: Arc<std::sync::Mutex<Registry>>,
353 languages: SharedLanguageRegistry,
354 platform_service: Arc<dyn PlatformService>,
355 ) -> Self {
356 Self {
357 routes: HashMap::new(),
358 registry,
359 languages,
360 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
361 runtime: None,
362 global_error_handler: None,
363 crash_notifier: None,
364 tracing_enabled: false,
365 tracer_detail_level: DetailLevel::Minimal,
366 tracer_metrics: None,
367 platform_service,
368 }
369 }
370
371 pub fn with_languages_and_beans(
372 registry: Arc<std::sync::Mutex<Registry>>,
373 languages: SharedLanguageRegistry,
374 platform_service: Arc<dyn PlatformService>,
375 beans: Arc<std::sync::Mutex<BeanRegistry>>,
376 ) -> Self {
377 Self {
378 routes: HashMap::new(),
379 registry,
380 languages,
381 beans,
382 runtime: None,
383 global_error_handler: None,
384 crash_notifier: None,
385 tracing_enabled: false,
386 tracer_detail_level: DetailLevel::Minimal,
387 tracer_metrics: None,
388 platform_service,
389 }
390 }
391
392 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
394 self.runtime = Some(Arc::downgrade(&runtime));
395 }
396
397 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
402 self.crash_notifier = Some(tx);
403 }
404
405 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
407 self.global_error_handler = Some(config);
408 }
409
410 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
412 self.tracing_enabled = config.enabled;
413 self.tracer_detail_level = config.detail_level.clone();
414 self.tracer_metrics = config.metrics_collector.clone();
415 }
416
417 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
418 let mut producer_ctx = ProducerContext::new();
419 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
420 producer_ctx = producer_ctx.with_runtime(runtime);
421 }
422 Ok(producer_ctx)
423 }
424
425 fn resolve_error_handler(
427 &self,
428 config: ErrorHandlerConfig,
429 producer_ctx: &ProducerContext,
430 component_ctx: &dyn ComponentContext,
431 ) -> Result<ErrorHandlerLayer, CamelError> {
432 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
434 let parsed = parse_uri(uri)?;
435 let component = component_ctx
436 .resolve_component(&parsed.scheme)
437 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
438 let endpoint = component.create_endpoint(uri, component_ctx)?;
439 Some(endpoint.create_producer(producer_ctx)?)
440 } else {
441 None
442 };
443
444 let mut resolved_policies = Vec::new();
446 for policy in config.policies {
447 let handler_producer = if let Some(ref uri) = policy.handled_by {
448 let parsed = parse_uri(uri)?;
449 let component = component_ctx
450 .resolve_component(&parsed.scheme)
451 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
452 let endpoint = component.create_endpoint(uri, component_ctx)?;
453 Some(endpoint.create_producer(producer_ctx)?)
454 } else {
455 None
456 };
457 resolved_policies.push((policy, handler_producer));
458 }
459
460 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
461 }
462
463 fn resolve_uow_layer(
466 &self,
467 config: &UnitOfWorkConfig,
468 producer_ctx: &ProducerContext,
469 component_ctx: &dyn ComponentContext,
470 counter: Option<Arc<AtomicU64>>,
471 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
472 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
473 let parsed = parse_uri(uri)?;
474 let component = component_ctx
475 .resolve_component(&parsed.scheme)
476 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
477 let endpoint = component.create_endpoint(uri, component_ctx)?;
478 endpoint.create_producer(producer_ctx).map_err(|e| {
479 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
480 })
481 };
482
483 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
484 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
485
486 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
487 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
488 Ok((layer, counter))
489 }
490
491 pub(crate) fn resolve_steps(
493 &self,
494 steps: Vec<BuilderStep>,
495 producer_ctx: &ProducerContext,
496 registry: &Arc<std::sync::Mutex<Registry>>,
497 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
498 let component_ctx = Arc::new(ControllerComponentContext::new(
499 Arc::clone(registry),
500 Arc::clone(&self.languages),
501 self.tracer_metrics
502 .clone()
503 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
504 Arc::clone(&self.platform_service),
505 ));
506
507 super::step_resolution::resolve_steps(
508 steps,
509 producer_ctx,
510 registry,
511 &self.languages,
512 &self.beans,
513 component_ctx,
514 )
515 }
516
517 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
527 let route_id = definition.route_id().to_string();
528
529 if self.routes.contains_key(&route_id) {
530 return Err(CamelError::RouteError(format!(
531 "Route '{}' already exists",
532 route_id
533 )));
534 }
535
536 info!(route_id = %route_id, "Adding route to controller");
537
538 let definition_info = definition.to_info();
540 let RouteDefinition {
541 from_uri,
542 steps,
543 error_handler,
544 circuit_breaker,
545 unit_of_work,
546 concurrency,
547 ..
548 } = definition;
549
550 let producer_ctx = self.build_producer_context()?;
552
553 let mut aggregate_split: Option<AggregateSplitInfo> = None;
555 let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
556 Some((idx, agg_config)) => {
557 let mut pre_steps = steps;
558 let mut rest = pre_steps.split_off(idx);
559 let _agg_step = rest.remove(0);
560 let post_steps = rest;
561
562 let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
563 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
564 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
565 compose_pipeline(pre_procs),
566 )));
567
568 let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
569 let post_procs: Vec<BoxProcessor> =
570 post_pairs.into_iter().map(|(p, _)| p).collect();
571 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
572 compose_pipeline(post_procs),
573 )));
574
575 aggregate_split = Some(AggregateSplitInfo {
576 pre_pipeline,
577 agg_config,
578 post_pipeline,
579 });
580
581 vec![]
582 }
583 None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
584 };
585 let route_id_for_tracing = route_id.clone();
586 let mut pipeline = if processors_with_contracts.is_empty() {
587 BoxProcessor::new(IdentityProcessor)
588 } else {
589 compose_traced_pipeline_with_contracts(
590 processors_with_contracts,
591 &route_id_for_tracing,
592 self.tracing_enabled,
593 self.tracer_detail_level.clone(),
594 self.tracer_metrics.clone(),
595 )
596 };
597
598 if let Some(cb_config) = circuit_breaker {
600 let cb_layer = CircuitBreakerLayer::new(cb_config);
601 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
602 }
603
604 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
606
607 if let Some(config) = eh_config {
608 let component_ctx = ControllerComponentContext::new(
609 Arc::clone(&self.registry),
610 Arc::clone(&self.languages),
611 self.tracer_metrics
612 .clone()
613 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
614 Arc::clone(&self.platform_service),
615 );
616 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
617 pipeline = BoxProcessor::new(layer.layer(pipeline));
618 }
619
620 let uow_counter = if let Some(uow_config) = &unit_of_work {
622 let component_ctx = ControllerComponentContext::new(
623 Arc::clone(&self.registry),
624 Arc::clone(&self.languages),
625 self.tracer_metrics
626 .clone()
627 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
628 Arc::clone(&self.platform_service),
629 );
630 let (uow_layer, counter) =
631 self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
632 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
633 Some(counter)
634 } else {
635 None
636 };
637
638 self.routes.insert(
639 route_id.clone(),
640 ManagedRoute {
641 definition: definition_info,
642 from_uri,
643 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
644 concurrency,
645 consumer_handle: None,
646 pipeline_handle: None,
647 consumer_cancel_token: CancellationToken::new(),
648 pipeline_cancel_token: CancellationToken::new(),
649 channel_sender: None,
650 in_flight: uow_counter,
651 aggregate_split,
652 agg_service: None,
653 },
654 );
655
656 Ok(())
657 }
658
659 pub fn compile_route_definition(
664 &self,
665 def: RouteDefinition,
666 ) -> Result<BoxProcessor, CamelError> {
667 let route_id = def.route_id().to_string();
668
669 let producer_ctx = self.build_producer_context()?;
670
671 let processors_with_contracts =
672 self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
673 let mut pipeline = compose_traced_pipeline_with_contracts(
674 processors_with_contracts,
675 &route_id,
676 self.tracing_enabled,
677 self.tracer_detail_level.clone(),
678 self.tracer_metrics.clone(),
679 );
680
681 if let Some(cb_config) = def.circuit_breaker {
682 let cb_layer = CircuitBreakerLayer::new(cb_config);
683 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
684 }
685
686 let eh_config = def
687 .error_handler
688 .clone()
689 .or_else(|| self.global_error_handler.clone());
690 if let Some(config) = eh_config {
691 let component_ctx = ControllerComponentContext::new(
692 Arc::clone(&self.registry),
693 Arc::clone(&self.languages),
694 self.tracer_metrics
695 .clone()
696 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
697 Arc::clone(&self.platform_service),
698 );
699 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
700 pipeline = BoxProcessor::new(layer.layer(pipeline));
701 }
702
703 if let Some(uow_config) = &def.unit_of_work {
705 let existing_counter = self
706 .routes
707 .get(&route_id)
708 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
709
710 let component_ctx = ControllerComponentContext::new(
711 Arc::clone(&self.registry),
712 Arc::clone(&self.languages),
713 self.tracer_metrics
714 .clone()
715 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
716 Arc::clone(&self.platform_service),
717 );
718
719 let (uow_layer, _counter) = self.resolve_uow_layer(
720 uow_config,
721 &producer_ctx,
722 &component_ctx,
723 existing_counter,
724 )?;
725
726 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
727 }
728
729 Ok(pipeline)
730 }
731
732 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
738 let managed = self.routes.get(route_id).ok_or_else(|| {
739 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
740 })?;
741 if handle_is_running(&managed.consumer_handle)
742 || handle_is_running(&managed.pipeline_handle)
743 {
744 return Err(CamelError::RouteError(format!(
745 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
746 route_id,
747 inferred_lifecycle_label(managed)
748 )));
749 }
750 self.routes.remove(route_id);
751 info!(route_id = %route_id, "Route removed from controller");
752 Ok(())
753 }
754
755 pub fn route_count(&self) -> usize {
757 self.routes.len()
758 }
759
760 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
761 self.routes.get(route_id).map(|r| {
762 r.in_flight
763 .as_ref()
764 .map_or(0, |c| c.load(Ordering::Relaxed))
765 })
766 }
767
768 pub fn route_exists(&self, route_id: &str) -> bool {
770 self.routes.contains_key(route_id)
771 }
772
773 pub fn route_ids(&self) -> Vec<String> {
775 self.routes.keys().cloned().collect()
776 }
777
778 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
779 self.routes
780 .get(route_id)
781 .and_then(|m| m.definition.source_hash())
782 }
783
784 pub fn auto_startup_route_ids(&self) -> Vec<String> {
786 let mut pairs: Vec<(String, i32)> = self
787 .routes
788 .iter()
789 .filter(|(_, managed)| managed.definition.auto_startup())
790 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
791 .collect();
792 pairs.sort_by_key(|(_, order)| *order);
793 pairs.into_iter().map(|(id, _)| id).collect()
794 }
795
796 pub fn shutdown_route_ids(&self) -> Vec<String> {
798 let mut pairs: Vec<(String, i32)> = self
799 .routes
800 .iter()
801 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
802 .collect();
803 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
804 pairs.into_iter().map(|(id, _)| id).collect()
805 }
806
807 pub fn swap_pipeline(
812 &self,
813 route_id: &str,
814 new_pipeline: BoxProcessor,
815 ) -> Result<(), CamelError> {
816 let managed = self
817 .routes
818 .get(route_id)
819 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
820
821 if managed.aggregate_split.is_some() {
822 tracing::warn!(
823 route_id = %route_id,
824 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
825 );
826 }
827
828 managed
829 .pipeline
830 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
831 info!(route_id = %route_id, "Pipeline swapped atomically");
832 Ok(())
833 }
834
835 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
837 self.routes.get(route_id).map(|r| r.from_uri.clone())
838 }
839
840 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
845 self.routes
846 .get(route_id)
847 .map(|r| r.pipeline.load().0.clone())
848 }
849
850 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
852 super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
853 }
854
855 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
856 self.start_route(route_id).await
857 }
858
859 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
860 self.stop_route(route_id).await
861 }
862}
863
864#[async_trait::async_trait]
865impl RouteController for DefaultRouteController {
866 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
867 {
869 let managed = self
870 .routes
871 .get_mut(route_id)
872 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
873
874 let consumer_running = handle_is_running(&managed.consumer_handle);
875 let pipeline_running = handle_is_running(&managed.pipeline_handle);
876 if consumer_running && pipeline_running {
877 return Ok(());
878 }
879 if !consumer_running && pipeline_running {
880 return Err(CamelError::RouteError(format!(
881 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
882 route_id
883 )));
884 }
885 if consumer_running && !pipeline_running {
886 return Err(CamelError::RouteError(format!(
887 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
888 route_id
889 )));
890 }
891 }
892
893 info!(route_id = %route_id, "Starting route");
894
895 let (from_uri, pipeline, concurrency) = {
897 let managed = self
898 .routes
899 .get(route_id)
900 .expect("invariant: route must exist after prior existence check");
901 (
902 managed.from_uri.clone(),
903 Arc::clone(&managed.pipeline),
904 managed.concurrency.clone(),
905 )
906 };
907
908 let crash_notifier = self.crash_notifier.clone();
910 let runtime_for_consumer = self.runtime.clone();
911
912 let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
913 &self.registry,
914 &from_uri,
915 &ControllerComponentContext::new(
916 Arc::clone(&self.registry),
917 Arc::clone(&self.languages),
918 self.tracer_metrics
919 .clone()
920 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
921 Arc::clone(&self.platform_service),
922 ),
923 )?;
924
925 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
927
928 let managed = self
930 .routes
931 .get_mut(route_id)
932 .expect("invariant: route must exist after prior existence check");
933
934 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
936 let consumer_cancel = managed.consumer_cancel_token.child_token();
938 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
939 let tx_for_storage = tx.clone();
941 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
942
943 let managed = self
945 .routes
946 .get_mut(route_id)
947 .expect("invariant: route must exist after prior existence check");
948
949 if let Some(split) = managed.aggregate_split.as_ref() {
950 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
951
952 let route_cancel_clone = pipeline_cancel.clone();
953 let svc = AggregatorService::new(
954 split.agg_config.clone(),
955 late_tx,
956 Arc::clone(&self.languages),
957 route_cancel_clone,
958 );
959 let agg = Arc::new(std::sync::Mutex::new(svc));
960
961 managed.agg_service = Some(Arc::clone(&agg));
962
963 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
964 let pre_pipeline = Arc::clone(&split.pre_pipeline);
965 let post_pipeline = Arc::clone(&split.post_pipeline);
966
967 let pipeline_handle = tokio::spawn(async move {
969 loop {
970 tokio::select! {
971 biased;
972
973 late_ex = async {
974 let mut rx = late_rx.lock().await;
975 rx.recv().await
976 } => {
977 match late_ex {
978 Some(ex) => {
979 let pipe = post_pipeline.load();
980 if let Err(e) = pipe.0.clone().oneshot(ex).await {
981 tracing::warn!(error = %e, "late exchange post-pipeline failed");
982 }
983 }
984 None => return,
985 }
986 }
987
988 envelope_opt = rx.recv() => {
989 match envelope_opt {
990 Some(envelope) => {
991 let ExchangeEnvelope { exchange, reply_tx } = envelope;
992 let pre_pipe = pre_pipeline.load();
993 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
994 Ok(ex) => ex,
995 Err(e) => {
996 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
997 continue;
998 }
999 };
1000
1001 let ex = {
1002 let cloned_svc = agg
1003 .lock()
1004 .expect("mutex poisoned: another thread panicked while holding this lock")
1005 .clone();
1006 cloned_svc.oneshot(ex).await
1007 };
1008
1009 match ex {
1010 Ok(ex) => {
1011 if !is_pending(&ex) {
1012 let post_pipe = post_pipeline.load();
1013 let out = post_pipe.0.clone().oneshot(ex).await;
1014 if let Some(tx) = reply_tx { let _ = tx.send(out); }
1015 } else if let Some(tx) = reply_tx {
1016 let _ = tx.send(Ok(ex));
1017 }
1018 }
1019 Err(e) => {
1020 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1021 }
1022 }
1023 }
1024 None => return,
1025 }
1026 }
1027
1028 _ = pipeline_cancel.cancelled() => {
1029 {
1030 let guard = agg
1031 .lock()
1032 .expect("mutex poisoned: another thread panicked while holding this lock");
1033 guard.force_complete_all();
1034 }
1035 let mut rx_guard = late_rx.lock().await;
1036 while let Ok(late_ex) = rx_guard.try_recv() {
1037 let pipe = post_pipeline.load();
1038 let _ = pipe.0.clone().oneshot(late_ex).await;
1039 }
1040 break;
1041 }
1042 }
1043 }
1044 });
1045 #[cfg(test)]
1046 emit_start_route_event("pipeline_spawned");
1047
1048 let consumer_handle = super::consumer_management::spawn_consumer_task(
1051 route_id.to_string(),
1052 consumer,
1053 consumer_ctx,
1054 crash_notifier,
1055 runtime_for_consumer,
1056 false,
1057 );
1058 #[cfg(test)]
1059 emit_start_route_event("consumer_spawned");
1060
1061 let managed = self
1062 .routes
1063 .get_mut(route_id)
1064 .expect("invariant: route must exist");
1065 managed.consumer_handle = Some(consumer_handle);
1066 managed.pipeline_handle = Some(pipeline_handle);
1067 managed.channel_sender = Some(tx_for_storage);
1068
1069 info!(route_id = %route_id, "Route started (aggregate with timeout)");
1070 return Ok(());
1071 }
1072 let pipeline_handle = match effective_concurrency {
1076 ConcurrencyModel::Sequential => {
1077 tokio::spawn(async move {
1078 loop {
1079 let envelope = tokio::select! {
1081 envelope = rx.recv() => match envelope {
1082 Some(e) => e,
1083 None => return, },
1085 _ = pipeline_cancel.cancelled() => {
1086 return;
1088 }
1089 };
1090 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1091
1092 let mut pipeline = pipeline.load().0.clone();
1094
1095 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1096 if let Some(tx) = reply_tx {
1097 let _ = tx.send(Err(e));
1098 }
1099 return;
1100 }
1101
1102 let result = pipeline.call(exchange).await;
1103 if let Some(tx) = reply_tx {
1104 let _ = tx.send(result);
1105 } else if let Err(ref e) = result
1106 && !matches!(e, CamelError::Stopped)
1107 {
1108 error!("Pipeline error: {e}");
1109 }
1110 }
1111 })
1112 }
1113 ConcurrencyModel::Concurrent { max } => {
1114 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1115 tokio::spawn(async move {
1116 loop {
1117 let envelope = tokio::select! {
1119 envelope = rx.recv() => match envelope {
1120 Some(e) => e,
1121 None => return, },
1123 _ = pipeline_cancel.cancelled() => {
1124 return;
1126 }
1127 };
1128 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1129 let pipe_ref = Arc::clone(&pipeline);
1130 let sem = sem.clone();
1131 let cancel = pipeline_cancel.clone();
1132 tokio::spawn(async move {
1133 let _permit = match &sem {
1135 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1136 None => None,
1137 };
1138
1139 let mut pipe = pipe_ref.load().0.clone();
1141
1142 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1144 if let Some(tx) = reply_tx {
1145 let _ = tx.send(Err(e));
1146 }
1147 return;
1148 }
1149
1150 let result = pipe.call(exchange).await;
1151 if let Some(tx) = reply_tx {
1152 let _ = tx.send(result);
1153 } else if let Err(ref e) = result
1154 && !matches!(e, CamelError::Stopped)
1155 {
1156 error!("Pipeline error: {e}");
1157 }
1158 });
1159 }
1160 })
1161 }
1162 };
1163 #[cfg(test)]
1164 emit_start_route_event("pipeline_spawned");
1165
1166 let consumer_handle = super::consumer_management::spawn_consumer_task(
1169 route_id.to_string(),
1170 consumer,
1171 consumer_ctx,
1172 crash_notifier,
1173 runtime_for_consumer,
1174 false,
1175 );
1176 #[cfg(test)]
1177 emit_start_route_event("consumer_spawned");
1178
1179 let managed = self
1181 .routes
1182 .get_mut(route_id)
1183 .expect("invariant: route must exist after prior existence check");
1184 managed.consumer_handle = Some(consumer_handle);
1185 managed.pipeline_handle = Some(pipeline_handle);
1186 managed.channel_sender = Some(tx_for_storage);
1187
1188 info!(route_id = %route_id, "Route started");
1189 Ok(())
1190 }
1191
1192 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1193 self.stop_route_internal(route_id).await
1194 }
1195
1196 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1197 self.stop_route(route_id).await?;
1198 tokio::time::sleep(Duration::from_millis(100)).await;
1199 self.start_route(route_id).await
1200 }
1201
1202 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1203 let managed = self
1205 .routes
1206 .get_mut(route_id)
1207 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1208
1209 let consumer_running = handle_is_running(&managed.consumer_handle);
1210 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1211
1212 if !consumer_running || !pipeline_running {
1214 return Err(CamelError::RouteError(format!(
1215 "Cannot suspend route '{}' with execution lifecycle {}",
1216 route_id,
1217 inferred_lifecycle_label(managed)
1218 )));
1219 }
1220
1221 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1222
1223 let managed = self
1225 .routes
1226 .get_mut(route_id)
1227 .expect("invariant: route must exist after prior existence check");
1228 managed.consumer_cancel_token.cancel();
1229
1230 let managed = self
1232 .routes
1233 .get_mut(route_id)
1234 .expect("invariant: route must exist after prior existence check");
1235 let consumer_handle = managed.consumer_handle.take();
1236
1237 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1239 if let Some(handle) = consumer_handle {
1240 let _ = handle.await;
1241 }
1242 })
1243 .await;
1244
1245 if timeout_result.is_err() {
1246 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1247 }
1248
1249 let managed = self
1251 .routes
1252 .get_mut(route_id)
1253 .expect("invariant: route must exist after prior existence check");
1254
1255 managed.consumer_cancel_token = CancellationToken::new();
1257
1258 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1259 Ok(())
1260 }
1261
1262 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1263 let managed = self
1265 .routes
1266 .get(route_id)
1267 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1268
1269 let consumer_running = handle_is_running(&managed.consumer_handle);
1270 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1271 if consumer_running || !pipeline_running {
1272 return Err(CamelError::RouteError(format!(
1273 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1274 route_id,
1275 inferred_lifecycle_label(managed)
1276 )));
1277 }
1278
1279 let sender = managed.channel_sender.clone().ok_or_else(|| {
1281 CamelError::RouteError("Suspended route has no channel sender".into())
1282 })?;
1283
1284 let from_uri = managed.from_uri.clone();
1286
1287 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1288
1289 let (consumer, _) = super::consumer_management::create_route_consumer(
1290 &self.registry,
1291 &from_uri,
1292 &ControllerComponentContext::new(
1293 Arc::clone(&self.registry),
1294 Arc::clone(&self.languages),
1295 self.tracer_metrics
1296 .clone()
1297 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1298 Arc::clone(&self.platform_service),
1299 ),
1300 )?;
1301
1302 let managed = self
1304 .routes
1305 .get_mut(route_id)
1306 .expect("invariant: route must exist after prior existence check");
1307
1308 let consumer_cancel = managed.consumer_cancel_token.child_token();
1310
1311 let crash_notifier = self.crash_notifier.clone();
1312 let runtime_for_consumer = self.runtime.clone();
1313
1314 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1316
1317 let consumer_handle = super::consumer_management::spawn_consumer_task(
1319 route_id.to_string(),
1320 consumer,
1321 consumer_ctx,
1322 crash_notifier,
1323 runtime_for_consumer,
1324 true,
1325 );
1326
1327 let managed = self
1329 .routes
1330 .get_mut(route_id)
1331 .expect("invariant: route must exist after prior existence check");
1332 managed.consumer_handle = Some(consumer_handle);
1333
1334 info!(route_id = %route_id, "Route resumed");
1335 Ok(())
1336 }
1337
1338 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1339 let route_ids: Vec<String> = {
1342 let mut pairs: Vec<_> = self
1343 .routes
1344 .iter()
1345 .filter(|(_, r)| r.definition.auto_startup())
1346 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1347 .collect();
1348 pairs.sort_by_key(|(_, order)| *order);
1349 pairs.into_iter().map(|(id, _)| id).collect()
1350 };
1351
1352 info!("Starting {} auto-startup routes", route_ids.len());
1353
1354 let mut errors: Vec<String> = Vec::new();
1356 for route_id in route_ids {
1357 if let Err(e) = self.start_route(&route_id).await {
1358 errors.push(format!("Route '{}': {}", route_id, e));
1359 }
1360 }
1361
1362 if !errors.is_empty() {
1363 return Err(CamelError::RouteError(format!(
1364 "Failed to start routes: {}",
1365 errors.join(", ")
1366 )));
1367 }
1368
1369 info!("All auto-startup routes started");
1370 Ok(())
1371 }
1372
1373 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1374 let route_ids: Vec<String> = {
1376 let mut pairs: Vec<_> = self
1377 .routes
1378 .iter()
1379 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1380 .collect();
1381 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1382 pairs.into_iter().map(|(id, _)| id).collect()
1383 };
1384
1385 info!("Stopping {} routes", route_ids.len());
1386
1387 for route_id in route_ids {
1388 let _ = self.stop_route(&route_id).await;
1389 }
1390
1391 info!("All routes stopped");
1392 Ok(())
1393 }
1394}
1395
1396#[cfg(test)]
1397#[path = "route_controller_tests.rs"]
1398mod tests;