1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21 BoxProcessor, CamelError, Exchange, IdentityProcessor, NoOpMetrics, NoopPlatformService,
22 PlatformService, ProducerContext, RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25 ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35 compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38 BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45#[derive(Debug, Clone)]
47pub struct CrashNotification {
48 pub route_id: String,
50 pub error: String,
52}
53
54pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73#[cfg(test)]
74type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
75
76#[cfg(test)]
77static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
78 std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
79
80#[cfg(test)]
81fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
82 *START_ROUTE_EVENT_HOOK
83 .lock()
84 .expect("start route event hook lock") = hook;
85}
86
87#[cfg(test)]
88fn emit_start_route_event(event: &'static str) {
89 if let Some(hook) = START_ROUTE_EVENT_HOOK
90 .lock()
91 .expect("start route event hook lock")
92 .as_ref()
93 {
94 hook(event);
95 }
96}
97
98pub(super) struct AggregateSplitInfo {
100 pub(super) pre_pipeline: SharedPipeline,
101 pub(super) agg_config: AggregatorConfig,
102 pub(super) post_pipeline: SharedPipeline,
103}
104
105pub(super) struct ManagedRoute {
106 pub(super) definition: RouteDefinitionInfo,
108 pub(super) from_uri: String,
110 pub(super) pipeline: SharedPipeline,
112 pub(super) concurrency: Option<ConcurrencyModel>,
114 pub(super) consumer_handle: Option<JoinHandle<()>>,
116 pub(super) pipeline_handle: Option<JoinHandle<()>>,
118 pub(super) consumer_cancel_token: CancellationToken,
121 pub(super) pipeline_cancel_token: CancellationToken,
124 pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
127 pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
129 pub(super) aggregate_split: Option<AggregateSplitInfo>,
130 pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
131}
132
133pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
134 handle.as_ref().is_some_and(|h| !h.is_finished())
135}
136
137fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
138 match (
139 handle_is_running(&managed.consumer_handle),
140 handle_is_running(&managed.pipeline_handle),
141 ) {
142 (true, true) => "Started",
143 (false, true) => "Suspended",
144 (true, false) => "Stopping",
145 (false, false) => "Stopped",
146 }
147}
148
149fn find_top_level_aggregate_with_timeout(
150 steps: &[BuilderStep],
151) -> Option<(usize, AggregatorConfig)> {
152 for (i, step) in steps.iter().enumerate() {
153 if let BuilderStep::Aggregate { config } = step {
154 if has_timeout_condition(&config.completion) {
155 return Some((i, config.clone()));
156 }
157 break;
158 }
159 }
160 None
161}
162
163pub(crate) struct ControllerComponentContext {
164 registry: Arc<std::sync::Mutex<Registry>>,
165 languages: SharedLanguageRegistry,
166 metrics: Arc<dyn MetricsCollector>,
167 platform_service: Arc<dyn PlatformService>,
168}
169
170impl ControllerComponentContext {
171 pub(crate) fn new(
172 registry: Arc<std::sync::Mutex<Registry>>,
173 languages: SharedLanguageRegistry,
174 metrics: Arc<dyn MetricsCollector>,
175 platform_service: Arc<dyn PlatformService>,
176 ) -> Self {
177 Self {
178 registry,
179 languages,
180 metrics,
181 platform_service,
182 }
183 }
184}
185
186impl ComponentContext for ControllerComponentContext {
187 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
188 self.registry.lock().ok()?.get(scheme)
189 }
190
191 fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
192 self.languages.lock().ok()?.get(name).cloned()
193 }
194
195 fn metrics(&self) -> Arc<dyn MetricsCollector> {
196 Arc::clone(&self.metrics)
197 }
198
199 fn platform_service(&self) -> Arc<dyn PlatformService> {
200 Arc::clone(&self.platform_service)
201 }
202}
203
204fn is_pending(ex: &Exchange) -> bool {
205 ex.property("CamelAggregatorPending")
206 .and_then(|v| v.as_bool())
207 .unwrap_or(false)
208}
209
210async fn ready_with_backoff(
217 pipeline: &mut BoxProcessor,
218 cancel: &CancellationToken,
219) -> Result<(), CamelError> {
220 loop {
221 match pipeline.ready().await {
222 Ok(_) => return Ok(()),
223 Err(CamelError::CircuitOpen(ref msg)) => {
224 warn!("Circuit open, backing off: {msg}");
225 tokio::select! {
226 _ = tokio::time::sleep(Duration::from_secs(1)) => {
227 continue;
228 }
229 _ = cancel.cancelled() => {
230 return Err(CamelError::CircuitOpen(msg.clone()));
232 }
233 }
234 }
235 Err(e) => {
236 error!("Pipeline not ready: {e}");
237 return Err(e);
238 }
239 }
240 }
241}
242
243fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
244 let stamp = std::time::SystemTime::now()
245 .duration_since(std::time::UNIX_EPOCH)
246 .unwrap_or_default()
247 .as_nanos();
248 RuntimeCommand::FailRoute {
249 route_id: route_id.to_string(),
250 error: error.to_string(),
251 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
252 causation_id: None,
253 }
254}
255
256pub(super) async fn publish_runtime_failure(
257 runtime: Option<Weak<dyn RuntimeHandle>>,
258 route_id: &str,
259 error: &str,
260) {
261 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
262 return;
263 };
264 let command = runtime_failure_command(route_id, error);
265 if let Err(runtime_error) = runtime.execute(command).await {
266 warn!(
267 route_id = %route_id,
268 error = %runtime_error,
269 "failed to synchronize route crash with runtime projection"
270 );
271 }
272}
273
274pub struct DefaultRouteController {
282 routes: HashMap<String, ManagedRoute>,
284 registry: Arc<std::sync::Mutex<Registry>>,
286 languages: SharedLanguageRegistry,
288 beans: Arc<std::sync::Mutex<BeanRegistry>>,
290 runtime: Option<Weak<dyn RuntimeHandle>>,
292 global_error_handler: Option<ErrorHandlerConfig>,
294 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
296 tracing_enabled: bool,
298 tracer_detail_level: DetailLevel,
300 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
302 platform_service: Arc<dyn PlatformService>,
303}
304
305impl DefaultRouteController {
306 pub fn new(
308 registry: Arc<std::sync::Mutex<Registry>>,
309 platform_service: Arc<dyn PlatformService>,
310 ) -> Self {
311 Self::with_beans_and_platform_service(
312 registry,
313 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
314 platform_service,
315 )
316 }
317
318 pub fn with_beans(
320 registry: Arc<std::sync::Mutex<Registry>>,
321 beans: Arc<std::sync::Mutex<BeanRegistry>>,
322 ) -> Self {
323 Self::with_beans_and_platform_service(
324 registry,
325 beans,
326 Arc::new(NoopPlatformService::default()),
327 )
328 }
329
330 fn with_beans_and_platform_service(
331 registry: Arc<std::sync::Mutex<Registry>>,
332 beans: Arc<std::sync::Mutex<BeanRegistry>>,
333 platform_service: Arc<dyn PlatformService>,
334 ) -> Self {
335 Self {
336 routes: HashMap::new(),
337 registry,
338 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
339 beans,
340 runtime: None,
341 global_error_handler: None,
342 crash_notifier: None,
343 tracing_enabled: false,
344 tracer_detail_level: DetailLevel::Minimal,
345 tracer_metrics: None,
346 platform_service,
347 }
348 }
349
350 pub fn with_languages(
352 registry: Arc<std::sync::Mutex<Registry>>,
353 languages: SharedLanguageRegistry,
354 platform_service: Arc<dyn PlatformService>,
355 ) -> Self {
356 Self {
357 routes: HashMap::new(),
358 registry,
359 languages,
360 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
361 runtime: None,
362 global_error_handler: None,
363 crash_notifier: None,
364 tracing_enabled: false,
365 tracer_detail_level: DetailLevel::Minimal,
366 tracer_metrics: None,
367 platform_service,
368 }
369 }
370
371 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
373 self.runtime = Some(Arc::downgrade(&runtime));
374 }
375
376 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
381 self.crash_notifier = Some(tx);
382 }
383
384 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
386 self.global_error_handler = Some(config);
387 }
388
389 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
391 self.tracing_enabled = config.enabled;
392 self.tracer_detail_level = config.detail_level.clone();
393 self.tracer_metrics = config.metrics_collector.clone();
394 }
395
396 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
397 let mut producer_ctx = ProducerContext::new();
398 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
399 producer_ctx = producer_ctx.with_runtime(runtime);
400 }
401 Ok(producer_ctx)
402 }
403
404 fn resolve_error_handler(
406 &self,
407 config: ErrorHandlerConfig,
408 producer_ctx: &ProducerContext,
409 component_ctx: &dyn ComponentContext,
410 ) -> Result<ErrorHandlerLayer, CamelError> {
411 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
413 let parsed = parse_uri(uri)?;
414 let component = component_ctx
415 .resolve_component(&parsed.scheme)
416 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
417 let endpoint = component.create_endpoint(uri, component_ctx)?;
418 Some(endpoint.create_producer(producer_ctx)?)
419 } else {
420 None
421 };
422
423 let mut resolved_policies = Vec::new();
425 for policy in config.policies {
426 let handler_producer = if let Some(ref uri) = policy.handled_by {
427 let parsed = parse_uri(uri)?;
428 let component = component_ctx
429 .resolve_component(&parsed.scheme)
430 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
431 let endpoint = component.create_endpoint(uri, component_ctx)?;
432 Some(endpoint.create_producer(producer_ctx)?)
433 } else {
434 None
435 };
436 resolved_policies.push((policy, handler_producer));
437 }
438
439 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
440 }
441
442 fn resolve_uow_layer(
445 &self,
446 config: &UnitOfWorkConfig,
447 producer_ctx: &ProducerContext,
448 component_ctx: &dyn ComponentContext,
449 counter: Option<Arc<AtomicU64>>,
450 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
451 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
452 let parsed = parse_uri(uri)?;
453 let component = component_ctx
454 .resolve_component(&parsed.scheme)
455 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
456 let endpoint = component.create_endpoint(uri, component_ctx)?;
457 endpoint.create_producer(producer_ctx).map_err(|e| {
458 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
459 })
460 };
461
462 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
463 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
464
465 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
466 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
467 Ok((layer, counter))
468 }
469
470 pub(crate) fn resolve_steps(
472 &self,
473 steps: Vec<BuilderStep>,
474 producer_ctx: &ProducerContext,
475 registry: &Arc<std::sync::Mutex<Registry>>,
476 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
477 let component_ctx = Arc::new(ControllerComponentContext::new(
478 Arc::clone(registry),
479 Arc::clone(&self.languages),
480 self.tracer_metrics
481 .clone()
482 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
483 Arc::clone(&self.platform_service),
484 ));
485
486 super::step_resolution::resolve_steps(
487 steps,
488 producer_ctx,
489 registry,
490 &self.languages,
491 &self.beans,
492 component_ctx,
493 )
494 }
495
496 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
506 let route_id = definition.route_id().to_string();
507
508 if self.routes.contains_key(&route_id) {
509 return Err(CamelError::RouteError(format!(
510 "Route '{}' already exists",
511 route_id
512 )));
513 }
514
515 info!(route_id = %route_id, "Adding route to controller");
516
517 let definition_info = definition.to_info();
519 let RouteDefinition {
520 from_uri,
521 steps,
522 error_handler,
523 circuit_breaker,
524 unit_of_work,
525 concurrency,
526 ..
527 } = definition;
528
529 let producer_ctx = self.build_producer_context()?;
531
532 let mut aggregate_split: Option<AggregateSplitInfo> = None;
534 let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
535 Some((idx, agg_config)) => {
536 let mut pre_steps = steps;
537 let mut rest = pre_steps.split_off(idx);
538 let _agg_step = rest.remove(0);
539 let post_steps = rest;
540
541 let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
542 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
543 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
544 compose_pipeline(pre_procs),
545 )));
546
547 let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
548 let post_procs: Vec<BoxProcessor> =
549 post_pairs.into_iter().map(|(p, _)| p).collect();
550 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
551 compose_pipeline(post_procs),
552 )));
553
554 aggregate_split = Some(AggregateSplitInfo {
555 pre_pipeline,
556 agg_config,
557 post_pipeline,
558 });
559
560 vec![]
561 }
562 None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
563 };
564 let route_id_for_tracing = route_id.clone();
565 let mut pipeline = if processors_with_contracts.is_empty() {
566 BoxProcessor::new(IdentityProcessor)
567 } else {
568 compose_traced_pipeline_with_contracts(
569 processors_with_contracts,
570 &route_id_for_tracing,
571 self.tracing_enabled,
572 self.tracer_detail_level.clone(),
573 self.tracer_metrics.clone(),
574 )
575 };
576
577 if let Some(cb_config) = circuit_breaker {
579 let cb_layer = CircuitBreakerLayer::new(cb_config);
580 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
581 }
582
583 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
585
586 if let Some(config) = eh_config {
587 let component_ctx = ControllerComponentContext::new(
588 Arc::clone(&self.registry),
589 Arc::clone(&self.languages),
590 self.tracer_metrics
591 .clone()
592 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
593 Arc::clone(&self.platform_service),
594 );
595 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
596 pipeline = BoxProcessor::new(layer.layer(pipeline));
597 }
598
599 let uow_counter = if let Some(uow_config) = &unit_of_work {
601 let component_ctx = ControllerComponentContext::new(
602 Arc::clone(&self.registry),
603 Arc::clone(&self.languages),
604 self.tracer_metrics
605 .clone()
606 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
607 Arc::clone(&self.platform_service),
608 );
609 let (uow_layer, counter) =
610 self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
611 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
612 Some(counter)
613 } else {
614 None
615 };
616
617 self.routes.insert(
618 route_id.clone(),
619 ManagedRoute {
620 definition: definition_info,
621 from_uri,
622 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
623 concurrency,
624 consumer_handle: None,
625 pipeline_handle: None,
626 consumer_cancel_token: CancellationToken::new(),
627 pipeline_cancel_token: CancellationToken::new(),
628 channel_sender: None,
629 in_flight: uow_counter,
630 aggregate_split,
631 agg_service: None,
632 },
633 );
634
635 Ok(())
636 }
637
638 pub fn compile_route_definition(
643 &self,
644 def: RouteDefinition,
645 ) -> Result<BoxProcessor, CamelError> {
646 let route_id = def.route_id().to_string();
647
648 let producer_ctx = self.build_producer_context()?;
649
650 let processors_with_contracts =
651 self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
652 let mut pipeline = compose_traced_pipeline_with_contracts(
653 processors_with_contracts,
654 &route_id,
655 self.tracing_enabled,
656 self.tracer_detail_level.clone(),
657 self.tracer_metrics.clone(),
658 );
659
660 if let Some(cb_config) = def.circuit_breaker {
661 let cb_layer = CircuitBreakerLayer::new(cb_config);
662 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
663 }
664
665 let eh_config = def
666 .error_handler
667 .clone()
668 .or_else(|| self.global_error_handler.clone());
669 if let Some(config) = eh_config {
670 let component_ctx = ControllerComponentContext::new(
671 Arc::clone(&self.registry),
672 Arc::clone(&self.languages),
673 self.tracer_metrics
674 .clone()
675 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
676 Arc::clone(&self.platform_service),
677 );
678 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
679 pipeline = BoxProcessor::new(layer.layer(pipeline));
680 }
681
682 if let Some(uow_config) = &def.unit_of_work {
684 let existing_counter = self
685 .routes
686 .get(&route_id)
687 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
688
689 let component_ctx = ControllerComponentContext::new(
690 Arc::clone(&self.registry),
691 Arc::clone(&self.languages),
692 self.tracer_metrics
693 .clone()
694 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
695 Arc::clone(&self.platform_service),
696 );
697
698 let (uow_layer, _counter) = self.resolve_uow_layer(
699 uow_config,
700 &producer_ctx,
701 &component_ctx,
702 existing_counter,
703 )?;
704
705 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
706 }
707
708 Ok(pipeline)
709 }
710
711 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
717 let managed = self.routes.get(route_id).ok_or_else(|| {
718 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
719 })?;
720 if handle_is_running(&managed.consumer_handle)
721 || handle_is_running(&managed.pipeline_handle)
722 {
723 return Err(CamelError::RouteError(format!(
724 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
725 route_id,
726 inferred_lifecycle_label(managed)
727 )));
728 }
729 self.routes.remove(route_id);
730 info!(route_id = %route_id, "Route removed from controller");
731 Ok(())
732 }
733
734 pub fn route_count(&self) -> usize {
736 self.routes.len()
737 }
738
739 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
740 self.routes.get(route_id).map(|r| {
741 r.in_flight
742 .as_ref()
743 .map_or(0, |c| c.load(Ordering::Relaxed))
744 })
745 }
746
747 pub fn route_exists(&self, route_id: &str) -> bool {
749 self.routes.contains_key(route_id)
750 }
751
752 pub fn route_ids(&self) -> Vec<String> {
754 self.routes.keys().cloned().collect()
755 }
756
757 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
758 self.routes
759 .get(route_id)
760 .and_then(|m| m.definition.source_hash())
761 }
762
763 pub fn auto_startup_route_ids(&self) -> Vec<String> {
765 let mut pairs: Vec<(String, i32)> = self
766 .routes
767 .iter()
768 .filter(|(_, managed)| managed.definition.auto_startup())
769 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
770 .collect();
771 pairs.sort_by_key(|(_, order)| *order);
772 pairs.into_iter().map(|(id, _)| id).collect()
773 }
774
775 pub fn shutdown_route_ids(&self) -> Vec<String> {
777 let mut pairs: Vec<(String, i32)> = self
778 .routes
779 .iter()
780 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
781 .collect();
782 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
783 pairs.into_iter().map(|(id, _)| id).collect()
784 }
785
786 pub fn swap_pipeline(
791 &self,
792 route_id: &str,
793 new_pipeline: BoxProcessor,
794 ) -> Result<(), CamelError> {
795 let managed = self
796 .routes
797 .get(route_id)
798 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
799
800 if managed.aggregate_split.is_some() {
801 tracing::warn!(
802 route_id = %route_id,
803 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
804 );
805 }
806
807 managed
808 .pipeline
809 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
810 info!(route_id = %route_id, "Pipeline swapped atomically");
811 Ok(())
812 }
813
814 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
816 self.routes.get(route_id).map(|r| r.from_uri.clone())
817 }
818
819 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
824 self.routes
825 .get(route_id)
826 .map(|r| r.pipeline.load().0.clone())
827 }
828
829 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
831 super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
832 }
833
834 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
835 self.start_route(route_id).await
836 }
837
838 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
839 self.stop_route(route_id).await
840 }
841}
842
843#[async_trait::async_trait]
844impl RouteController for DefaultRouteController {
845 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
846 {
848 let managed = self
849 .routes
850 .get_mut(route_id)
851 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
852
853 let consumer_running = handle_is_running(&managed.consumer_handle);
854 let pipeline_running = handle_is_running(&managed.pipeline_handle);
855 if consumer_running && pipeline_running {
856 return Ok(());
857 }
858 if !consumer_running && pipeline_running {
859 return Err(CamelError::RouteError(format!(
860 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
861 route_id
862 )));
863 }
864 if consumer_running && !pipeline_running {
865 return Err(CamelError::RouteError(format!(
866 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
867 route_id
868 )));
869 }
870 }
871
872 info!(route_id = %route_id, "Starting route");
873
874 let (from_uri, pipeline, concurrency) = {
876 let managed = self
877 .routes
878 .get(route_id)
879 .expect("invariant: route must exist after prior existence check");
880 (
881 managed.from_uri.clone(),
882 Arc::clone(&managed.pipeline),
883 managed.concurrency.clone(),
884 )
885 };
886
887 let crash_notifier = self.crash_notifier.clone();
889 let runtime_for_consumer = self.runtime.clone();
890
891 let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
892 &self.registry,
893 &from_uri,
894 &ControllerComponentContext::new(
895 Arc::clone(&self.registry),
896 Arc::clone(&self.languages),
897 self.tracer_metrics
898 .clone()
899 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
900 Arc::clone(&self.platform_service),
901 ),
902 )?;
903
904 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
906
907 let managed = self
909 .routes
910 .get_mut(route_id)
911 .expect("invariant: route must exist after prior existence check");
912
913 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
915 let consumer_cancel = managed.consumer_cancel_token.child_token();
917 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
918 let tx_for_storage = tx.clone();
920 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
921
922 let managed = self
924 .routes
925 .get_mut(route_id)
926 .expect("invariant: route must exist after prior existence check");
927
928 if let Some(split) = managed.aggregate_split.as_ref() {
929 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
930
931 let route_cancel_clone = pipeline_cancel.clone();
932 let svc = AggregatorService::new(
933 split.agg_config.clone(),
934 late_tx,
935 Arc::clone(&self.languages),
936 route_cancel_clone,
937 );
938 let agg = Arc::new(std::sync::Mutex::new(svc));
939
940 managed.agg_service = Some(Arc::clone(&agg));
941
942 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
943 let pre_pipeline = Arc::clone(&split.pre_pipeline);
944 let post_pipeline = Arc::clone(&split.post_pipeline);
945
946 let pipeline_handle = tokio::spawn(async move {
948 loop {
949 tokio::select! {
950 biased;
951
952 late_ex = async {
953 let mut rx = late_rx.lock().await;
954 rx.recv().await
955 } => {
956 match late_ex {
957 Some(ex) => {
958 let pipe = post_pipeline.load();
959 if let Err(e) = pipe.0.clone().oneshot(ex).await {
960 tracing::warn!(error = %e, "late exchange post-pipeline failed");
961 }
962 }
963 None => return,
964 }
965 }
966
967 envelope_opt = rx.recv() => {
968 match envelope_opt {
969 Some(envelope) => {
970 let ExchangeEnvelope { exchange, reply_tx } = envelope;
971 let pre_pipe = pre_pipeline.load();
972 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
973 Ok(ex) => ex,
974 Err(e) => {
975 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
976 continue;
977 }
978 };
979
980 let ex = {
981 let cloned_svc = agg
982 .lock()
983 .expect("mutex poisoned: another thread panicked while holding this lock")
984 .clone();
985 cloned_svc.oneshot(ex).await
986 };
987
988 match ex {
989 Ok(ex) => {
990 if !is_pending(&ex) {
991 let post_pipe = post_pipeline.load();
992 let out = post_pipe.0.clone().oneshot(ex).await;
993 if let Some(tx) = reply_tx { let _ = tx.send(out); }
994 } else if let Some(tx) = reply_tx {
995 let _ = tx.send(Ok(ex));
996 }
997 }
998 Err(e) => {
999 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1000 }
1001 }
1002 }
1003 None => return,
1004 }
1005 }
1006
1007 _ = pipeline_cancel.cancelled() => {
1008 {
1009 let guard = agg
1010 .lock()
1011 .expect("mutex poisoned: another thread panicked while holding this lock");
1012 guard.force_complete_all();
1013 }
1014 let mut rx_guard = late_rx.lock().await;
1015 while let Ok(late_ex) = rx_guard.try_recv() {
1016 let pipe = post_pipeline.load();
1017 let _ = pipe.0.clone().oneshot(late_ex).await;
1018 }
1019 break;
1020 }
1021 }
1022 }
1023 });
1024 #[cfg(test)]
1025 emit_start_route_event("pipeline_spawned");
1026
1027 let consumer_handle = super::consumer_management::spawn_consumer_task(
1030 route_id.to_string(),
1031 consumer,
1032 consumer_ctx,
1033 crash_notifier,
1034 runtime_for_consumer,
1035 false,
1036 );
1037 #[cfg(test)]
1038 emit_start_route_event("consumer_spawned");
1039
1040 let managed = self
1041 .routes
1042 .get_mut(route_id)
1043 .expect("invariant: route must exist");
1044 managed.consumer_handle = Some(consumer_handle);
1045 managed.pipeline_handle = Some(pipeline_handle);
1046 managed.channel_sender = Some(tx_for_storage);
1047
1048 info!(route_id = %route_id, "Route started (aggregate with timeout)");
1049 return Ok(());
1050 }
1051 let pipeline_handle = match effective_concurrency {
1055 ConcurrencyModel::Sequential => {
1056 tokio::spawn(async move {
1057 loop {
1058 let envelope = tokio::select! {
1060 envelope = rx.recv() => match envelope {
1061 Some(e) => e,
1062 None => return, },
1064 _ = pipeline_cancel.cancelled() => {
1065 return;
1067 }
1068 };
1069 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1070
1071 let mut pipeline = pipeline.load().0.clone();
1073
1074 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1075 if let Some(tx) = reply_tx {
1076 let _ = tx.send(Err(e));
1077 }
1078 return;
1079 }
1080
1081 let result = pipeline.call(exchange).await;
1082 if let Some(tx) = reply_tx {
1083 let _ = tx.send(result);
1084 } else if let Err(ref e) = result
1085 && !matches!(e, CamelError::Stopped)
1086 {
1087 error!("Pipeline error: {e}");
1088 }
1089 }
1090 })
1091 }
1092 ConcurrencyModel::Concurrent { max } => {
1093 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1094 tokio::spawn(async move {
1095 loop {
1096 let envelope = tokio::select! {
1098 envelope = rx.recv() => match envelope {
1099 Some(e) => e,
1100 None => return, },
1102 _ = pipeline_cancel.cancelled() => {
1103 return;
1105 }
1106 };
1107 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1108 let pipe_ref = Arc::clone(&pipeline);
1109 let sem = sem.clone();
1110 let cancel = pipeline_cancel.clone();
1111 tokio::spawn(async move {
1112 let _permit = match &sem {
1114 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1115 None => None,
1116 };
1117
1118 let mut pipe = pipe_ref.load().0.clone();
1120
1121 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1123 if let Some(tx) = reply_tx {
1124 let _ = tx.send(Err(e));
1125 }
1126 return;
1127 }
1128
1129 let result = pipe.call(exchange).await;
1130 if let Some(tx) = reply_tx {
1131 let _ = tx.send(result);
1132 } else if let Err(ref e) = result
1133 && !matches!(e, CamelError::Stopped)
1134 {
1135 error!("Pipeline error: {e}");
1136 }
1137 });
1138 }
1139 })
1140 }
1141 };
1142 #[cfg(test)]
1143 emit_start_route_event("pipeline_spawned");
1144
1145 let consumer_handle = super::consumer_management::spawn_consumer_task(
1148 route_id.to_string(),
1149 consumer,
1150 consumer_ctx,
1151 crash_notifier,
1152 runtime_for_consumer,
1153 false,
1154 );
1155 #[cfg(test)]
1156 emit_start_route_event("consumer_spawned");
1157
1158 let managed = self
1160 .routes
1161 .get_mut(route_id)
1162 .expect("invariant: route must exist after prior existence check");
1163 managed.consumer_handle = Some(consumer_handle);
1164 managed.pipeline_handle = Some(pipeline_handle);
1165 managed.channel_sender = Some(tx_for_storage);
1166
1167 info!(route_id = %route_id, "Route started");
1168 Ok(())
1169 }
1170
1171 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1172 self.stop_route_internal(route_id).await
1173 }
1174
1175 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1176 self.stop_route(route_id).await?;
1177 tokio::time::sleep(Duration::from_millis(100)).await;
1178 self.start_route(route_id).await
1179 }
1180
1181 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1182 let managed = self
1184 .routes
1185 .get_mut(route_id)
1186 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1187
1188 let consumer_running = handle_is_running(&managed.consumer_handle);
1189 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1190
1191 if !consumer_running || !pipeline_running {
1193 return Err(CamelError::RouteError(format!(
1194 "Cannot suspend route '{}' with execution lifecycle {}",
1195 route_id,
1196 inferred_lifecycle_label(managed)
1197 )));
1198 }
1199
1200 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1201
1202 let managed = self
1204 .routes
1205 .get_mut(route_id)
1206 .expect("invariant: route must exist after prior existence check");
1207 managed.consumer_cancel_token.cancel();
1208
1209 let managed = self
1211 .routes
1212 .get_mut(route_id)
1213 .expect("invariant: route must exist after prior existence check");
1214 let consumer_handle = managed.consumer_handle.take();
1215
1216 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1218 if let Some(handle) = consumer_handle {
1219 let _ = handle.await;
1220 }
1221 })
1222 .await;
1223
1224 if timeout_result.is_err() {
1225 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1226 }
1227
1228 let managed = self
1230 .routes
1231 .get_mut(route_id)
1232 .expect("invariant: route must exist after prior existence check");
1233
1234 managed.consumer_cancel_token = CancellationToken::new();
1236
1237 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1238 Ok(())
1239 }
1240
1241 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1242 let managed = self
1244 .routes
1245 .get(route_id)
1246 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1247
1248 let consumer_running = handle_is_running(&managed.consumer_handle);
1249 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1250 if consumer_running || !pipeline_running {
1251 return Err(CamelError::RouteError(format!(
1252 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1253 route_id,
1254 inferred_lifecycle_label(managed)
1255 )));
1256 }
1257
1258 let sender = managed.channel_sender.clone().ok_or_else(|| {
1260 CamelError::RouteError("Suspended route has no channel sender".into())
1261 })?;
1262
1263 let from_uri = managed.from_uri.clone();
1265
1266 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1267
1268 let (consumer, _) = super::consumer_management::create_route_consumer(
1269 &self.registry,
1270 &from_uri,
1271 &ControllerComponentContext::new(
1272 Arc::clone(&self.registry),
1273 Arc::clone(&self.languages),
1274 self.tracer_metrics
1275 .clone()
1276 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1277 Arc::clone(&self.platform_service),
1278 ),
1279 )?;
1280
1281 let managed = self
1283 .routes
1284 .get_mut(route_id)
1285 .expect("invariant: route must exist after prior existence check");
1286
1287 let consumer_cancel = managed.consumer_cancel_token.child_token();
1289
1290 let crash_notifier = self.crash_notifier.clone();
1291 let runtime_for_consumer = self.runtime.clone();
1292
1293 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1295
1296 let consumer_handle = super::consumer_management::spawn_consumer_task(
1298 route_id.to_string(),
1299 consumer,
1300 consumer_ctx,
1301 crash_notifier,
1302 runtime_for_consumer,
1303 true,
1304 );
1305
1306 let managed = self
1308 .routes
1309 .get_mut(route_id)
1310 .expect("invariant: route must exist after prior existence check");
1311 managed.consumer_handle = Some(consumer_handle);
1312
1313 info!(route_id = %route_id, "Route resumed");
1314 Ok(())
1315 }
1316
1317 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1318 let route_ids: Vec<String> = {
1321 let mut pairs: Vec<_> = self
1322 .routes
1323 .iter()
1324 .filter(|(_, r)| r.definition.auto_startup())
1325 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1326 .collect();
1327 pairs.sort_by_key(|(_, order)| *order);
1328 pairs.into_iter().map(|(id, _)| id).collect()
1329 };
1330
1331 info!("Starting {} auto-startup routes", route_ids.len());
1332
1333 let mut errors: Vec<String> = Vec::new();
1335 for route_id in route_ids {
1336 if let Err(e) = self.start_route(&route_id).await {
1337 errors.push(format!("Route '{}': {}", route_id, e));
1338 }
1339 }
1340
1341 if !errors.is_empty() {
1342 return Err(CamelError::RouteError(format!(
1343 "Failed to start routes: {}",
1344 errors.join(", ")
1345 )));
1346 }
1347
1348 info!("All auto-startup routes started");
1349 Ok(())
1350 }
1351
1352 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1353 let route_ids: Vec<String> = {
1355 let mut pairs: Vec<_> = self
1356 .routes
1357 .iter()
1358 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1359 .collect();
1360 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1361 pairs.into_iter().map(|(id, _)| id).collect()
1362 };
1363
1364 info!("Stopping {} routes", route_ids.len());
1365
1366 for route_id in route_ids {
1367 let _ = self.stop_route(&route_id).await;
1368 }
1369
1370 info!("All routes stopped");
1371 Ok(())
1372 }
1373}
1374
1375#[cfg(test)]
1376#[path = "route_controller_tests.rs"]
1377mod tests;