1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21 BoxProcessor, CamelError, Exchange, IdentityProcessor, NoOpMetrics, NoopPlatformService,
22 PlatformService, ProducerContext, RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25 ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35 compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38 BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45#[derive(Debug, Clone)]
47pub struct CrashNotification {
48 pub route_id: String,
50 pub error: String,
52}
53
54pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73pub(super) struct AggregateSplitInfo {
75 pub(super) pre_pipeline: SharedPipeline,
76 pub(super) agg_config: AggregatorConfig,
77 pub(super) post_pipeline: SharedPipeline,
78}
79
80pub(super) struct ManagedRoute {
81 pub(super) definition: RouteDefinitionInfo,
83 pub(super) from_uri: String,
85 pub(super) pipeline: SharedPipeline,
87 pub(super) concurrency: Option<ConcurrencyModel>,
89 pub(super) consumer_handle: Option<JoinHandle<()>>,
91 pub(super) pipeline_handle: Option<JoinHandle<()>>,
93 pub(super) consumer_cancel_token: CancellationToken,
96 pub(super) pipeline_cancel_token: CancellationToken,
99 pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
102 pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
104 pub(super) aggregate_split: Option<AggregateSplitInfo>,
105 pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
106}
107
108pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
109 handle.as_ref().is_some_and(|h| !h.is_finished())
110}
111
112fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
113 match (
114 handle_is_running(&managed.consumer_handle),
115 handle_is_running(&managed.pipeline_handle),
116 ) {
117 (true, true) => "Started",
118 (false, true) => "Suspended",
119 (true, false) => "Stopping",
120 (false, false) => "Stopped",
121 }
122}
123
124fn find_top_level_aggregate_with_timeout(
125 steps: &[BuilderStep],
126) -> Option<(usize, AggregatorConfig)> {
127 for (i, step) in steps.iter().enumerate() {
128 if let BuilderStep::Aggregate { config } = step {
129 if has_timeout_condition(&config.completion) {
130 return Some((i, config.clone()));
131 }
132 break;
133 }
134 }
135 None
136}
137
138pub(crate) struct ControllerComponentContext {
139 registry: Arc<std::sync::Mutex<Registry>>,
140 languages: SharedLanguageRegistry,
141 metrics: Arc<dyn MetricsCollector>,
142 platform_service: Arc<dyn PlatformService>,
143}
144
145impl ControllerComponentContext {
146 pub(crate) fn new(
147 registry: Arc<std::sync::Mutex<Registry>>,
148 languages: SharedLanguageRegistry,
149 metrics: Arc<dyn MetricsCollector>,
150 platform_service: Arc<dyn PlatformService>,
151 ) -> Self {
152 Self {
153 registry,
154 languages,
155 metrics,
156 platform_service,
157 }
158 }
159}
160
161impl ComponentContext for ControllerComponentContext {
162 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
163 self.registry.lock().ok()?.get(scheme)
164 }
165
166 fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
167 self.languages.lock().ok()?.get(name).cloned()
168 }
169
170 fn metrics(&self) -> Arc<dyn MetricsCollector> {
171 Arc::clone(&self.metrics)
172 }
173
174 fn platform_service(&self) -> Arc<dyn PlatformService> {
175 Arc::clone(&self.platform_service)
176 }
177}
178
179fn is_pending(ex: &Exchange) -> bool {
180 ex.property("CamelAggregatorPending")
181 .and_then(|v| v.as_bool())
182 .unwrap_or(false)
183}
184
185async fn ready_with_backoff(
192 pipeline: &mut BoxProcessor,
193 cancel: &CancellationToken,
194) -> Result<(), CamelError> {
195 loop {
196 match pipeline.ready().await {
197 Ok(_) => return Ok(()),
198 Err(CamelError::CircuitOpen(ref msg)) => {
199 warn!("Circuit open, backing off: {msg}");
200 tokio::select! {
201 _ = tokio::time::sleep(Duration::from_secs(1)) => {
202 continue;
203 }
204 _ = cancel.cancelled() => {
205 return Err(CamelError::CircuitOpen(msg.clone()));
207 }
208 }
209 }
210 Err(e) => {
211 error!("Pipeline not ready: {e}");
212 return Err(e);
213 }
214 }
215 }
216}
217
218fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
219 let stamp = std::time::SystemTime::now()
220 .duration_since(std::time::UNIX_EPOCH)
221 .unwrap_or_default()
222 .as_nanos();
223 RuntimeCommand::FailRoute {
224 route_id: route_id.to_string(),
225 error: error.to_string(),
226 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
227 causation_id: None,
228 }
229}
230
231pub(super) async fn publish_runtime_failure(
232 runtime: Option<Weak<dyn RuntimeHandle>>,
233 route_id: &str,
234 error: &str,
235) {
236 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
237 return;
238 };
239 let command = runtime_failure_command(route_id, error);
240 if let Err(runtime_error) = runtime.execute(command).await {
241 warn!(
242 route_id = %route_id,
243 error = %runtime_error,
244 "failed to synchronize route crash with runtime projection"
245 );
246 }
247}
248
249pub struct DefaultRouteController {
257 routes: HashMap<String, ManagedRoute>,
259 registry: Arc<std::sync::Mutex<Registry>>,
261 languages: SharedLanguageRegistry,
263 beans: Arc<std::sync::Mutex<BeanRegistry>>,
265 runtime: Option<Weak<dyn RuntimeHandle>>,
267 global_error_handler: Option<ErrorHandlerConfig>,
269 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
271 tracing_enabled: bool,
273 tracer_detail_level: DetailLevel,
275 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
277 platform_service: Arc<dyn PlatformService>,
278}
279
280impl DefaultRouteController {
281 pub fn new(
283 registry: Arc<std::sync::Mutex<Registry>>,
284 platform_service: Arc<dyn PlatformService>,
285 ) -> Self {
286 Self::with_beans_and_platform_service(
287 registry,
288 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
289 platform_service,
290 )
291 }
292
293 pub fn with_beans(
295 registry: Arc<std::sync::Mutex<Registry>>,
296 beans: Arc<std::sync::Mutex<BeanRegistry>>,
297 ) -> Self {
298 Self::with_beans_and_platform_service(
299 registry,
300 beans,
301 Arc::new(NoopPlatformService::default()),
302 )
303 }
304
305 fn with_beans_and_platform_service(
306 registry: Arc<std::sync::Mutex<Registry>>,
307 beans: Arc<std::sync::Mutex<BeanRegistry>>,
308 platform_service: Arc<dyn PlatformService>,
309 ) -> Self {
310 Self {
311 routes: HashMap::new(),
312 registry,
313 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
314 beans,
315 runtime: None,
316 global_error_handler: None,
317 crash_notifier: None,
318 tracing_enabled: false,
319 tracer_detail_level: DetailLevel::Minimal,
320 tracer_metrics: None,
321 platform_service,
322 }
323 }
324
325 pub fn with_languages(
327 registry: Arc<std::sync::Mutex<Registry>>,
328 languages: SharedLanguageRegistry,
329 platform_service: Arc<dyn PlatformService>,
330 ) -> Self {
331 Self {
332 routes: HashMap::new(),
333 registry,
334 languages,
335 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
336 runtime: None,
337 global_error_handler: None,
338 crash_notifier: None,
339 tracing_enabled: false,
340 tracer_detail_level: DetailLevel::Minimal,
341 tracer_metrics: None,
342 platform_service,
343 }
344 }
345
346 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
348 self.runtime = Some(Arc::downgrade(&runtime));
349 }
350
351 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
356 self.crash_notifier = Some(tx);
357 }
358
359 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
361 self.global_error_handler = Some(config);
362 }
363
364 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
366 self.tracing_enabled = config.enabled;
367 self.tracer_detail_level = config.detail_level.clone();
368 self.tracer_metrics = config.metrics_collector.clone();
369 }
370
371 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
372 let mut producer_ctx = ProducerContext::new();
373 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
374 producer_ctx = producer_ctx.with_runtime(runtime);
375 }
376 Ok(producer_ctx)
377 }
378
379 fn resolve_error_handler(
381 &self,
382 config: ErrorHandlerConfig,
383 producer_ctx: &ProducerContext,
384 component_ctx: &dyn ComponentContext,
385 ) -> Result<ErrorHandlerLayer, CamelError> {
386 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
388 let parsed = parse_uri(uri)?;
389 let component = component_ctx
390 .resolve_component(&parsed.scheme)
391 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
392 let endpoint = component.create_endpoint(uri, component_ctx)?;
393 Some(endpoint.create_producer(producer_ctx)?)
394 } else {
395 None
396 };
397
398 let mut resolved_policies = Vec::new();
400 for policy in config.policies {
401 let handler_producer = if let Some(ref uri) = policy.handled_by {
402 let parsed = parse_uri(uri)?;
403 let component = component_ctx
404 .resolve_component(&parsed.scheme)
405 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
406 let endpoint = component.create_endpoint(uri, component_ctx)?;
407 Some(endpoint.create_producer(producer_ctx)?)
408 } else {
409 None
410 };
411 resolved_policies.push((policy, handler_producer));
412 }
413
414 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
415 }
416
417 fn resolve_uow_layer(
420 &self,
421 config: &UnitOfWorkConfig,
422 producer_ctx: &ProducerContext,
423 component_ctx: &dyn ComponentContext,
424 counter: Option<Arc<AtomicU64>>,
425 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
426 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
427 let parsed = parse_uri(uri)?;
428 let component = component_ctx
429 .resolve_component(&parsed.scheme)
430 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
431 let endpoint = component.create_endpoint(uri, component_ctx)?;
432 endpoint.create_producer(producer_ctx).map_err(|e| {
433 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
434 })
435 };
436
437 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
438 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
439
440 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
441 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
442 Ok((layer, counter))
443 }
444
445 pub(crate) fn resolve_steps(
447 &self,
448 steps: Vec<BuilderStep>,
449 producer_ctx: &ProducerContext,
450 registry: &Arc<std::sync::Mutex<Registry>>,
451 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
452 let component_ctx = Arc::new(ControllerComponentContext::new(
453 Arc::clone(registry),
454 Arc::clone(&self.languages),
455 self.tracer_metrics
456 .clone()
457 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
458 Arc::clone(&self.platform_service),
459 ));
460
461 super::step_resolution::resolve_steps(
462 steps,
463 producer_ctx,
464 registry,
465 &self.languages,
466 &self.beans,
467 component_ctx,
468 )
469 }
470
471 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
481 let route_id = definition.route_id().to_string();
482
483 if self.routes.contains_key(&route_id) {
484 return Err(CamelError::RouteError(format!(
485 "Route '{}' already exists",
486 route_id
487 )));
488 }
489
490 info!(route_id = %route_id, "Adding route to controller");
491
492 let definition_info = definition.to_info();
494 let RouteDefinition {
495 from_uri,
496 steps,
497 error_handler,
498 circuit_breaker,
499 unit_of_work,
500 concurrency,
501 ..
502 } = definition;
503
504 let producer_ctx = self.build_producer_context()?;
506
507 let mut aggregate_split: Option<AggregateSplitInfo> = None;
509 let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
510 Some((idx, agg_config)) => {
511 let mut pre_steps = steps;
512 let mut rest = pre_steps.split_off(idx);
513 let _agg_step = rest.remove(0);
514 let post_steps = rest;
515
516 let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
517 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
518 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
519 compose_pipeline(pre_procs),
520 )));
521
522 let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
523 let post_procs: Vec<BoxProcessor> =
524 post_pairs.into_iter().map(|(p, _)| p).collect();
525 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
526 compose_pipeline(post_procs),
527 )));
528
529 aggregate_split = Some(AggregateSplitInfo {
530 pre_pipeline,
531 agg_config,
532 post_pipeline,
533 });
534
535 vec![]
536 }
537 None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
538 };
539 let route_id_for_tracing = route_id.clone();
540 let mut pipeline = if processors_with_contracts.is_empty() {
541 BoxProcessor::new(IdentityProcessor)
542 } else {
543 compose_traced_pipeline_with_contracts(
544 processors_with_contracts,
545 &route_id_for_tracing,
546 self.tracing_enabled,
547 self.tracer_detail_level.clone(),
548 self.tracer_metrics.clone(),
549 )
550 };
551
552 if let Some(cb_config) = circuit_breaker {
554 let cb_layer = CircuitBreakerLayer::new(cb_config);
555 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
556 }
557
558 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
560
561 if let Some(config) = eh_config {
562 let component_ctx = ControllerComponentContext::new(
563 Arc::clone(&self.registry),
564 Arc::clone(&self.languages),
565 self.tracer_metrics
566 .clone()
567 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
568 Arc::clone(&self.platform_service),
569 );
570 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
571 pipeline = BoxProcessor::new(layer.layer(pipeline));
572 }
573
574 let uow_counter = if let Some(uow_config) = &unit_of_work {
576 let component_ctx = ControllerComponentContext::new(
577 Arc::clone(&self.registry),
578 Arc::clone(&self.languages),
579 self.tracer_metrics
580 .clone()
581 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
582 Arc::clone(&self.platform_service),
583 );
584 let (uow_layer, counter) =
585 self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
586 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
587 Some(counter)
588 } else {
589 None
590 };
591
592 self.routes.insert(
593 route_id.clone(),
594 ManagedRoute {
595 definition: definition_info,
596 from_uri,
597 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
598 concurrency,
599 consumer_handle: None,
600 pipeline_handle: None,
601 consumer_cancel_token: CancellationToken::new(),
602 pipeline_cancel_token: CancellationToken::new(),
603 channel_sender: None,
604 in_flight: uow_counter,
605 aggregate_split,
606 agg_service: None,
607 },
608 );
609
610 Ok(())
611 }
612
613 pub fn compile_route_definition(
618 &self,
619 def: RouteDefinition,
620 ) -> Result<BoxProcessor, CamelError> {
621 let route_id = def.route_id().to_string();
622
623 let producer_ctx = self.build_producer_context()?;
624
625 let processors_with_contracts =
626 self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
627 let mut pipeline = compose_traced_pipeline_with_contracts(
628 processors_with_contracts,
629 &route_id,
630 self.tracing_enabled,
631 self.tracer_detail_level.clone(),
632 self.tracer_metrics.clone(),
633 );
634
635 if let Some(cb_config) = def.circuit_breaker {
636 let cb_layer = CircuitBreakerLayer::new(cb_config);
637 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
638 }
639
640 let eh_config = def
641 .error_handler
642 .clone()
643 .or_else(|| self.global_error_handler.clone());
644 if let Some(config) = eh_config {
645 let component_ctx = ControllerComponentContext::new(
646 Arc::clone(&self.registry),
647 Arc::clone(&self.languages),
648 self.tracer_metrics
649 .clone()
650 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
651 Arc::clone(&self.platform_service),
652 );
653 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
654 pipeline = BoxProcessor::new(layer.layer(pipeline));
655 }
656
657 if let Some(uow_config) = &def.unit_of_work {
659 let existing_counter = self
660 .routes
661 .get(&route_id)
662 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
663
664 let component_ctx = ControllerComponentContext::new(
665 Arc::clone(&self.registry),
666 Arc::clone(&self.languages),
667 self.tracer_metrics
668 .clone()
669 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
670 Arc::clone(&self.platform_service),
671 );
672
673 let (uow_layer, _counter) = self.resolve_uow_layer(
674 uow_config,
675 &producer_ctx,
676 &component_ctx,
677 existing_counter,
678 )?;
679
680 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
681 }
682
683 Ok(pipeline)
684 }
685
686 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
692 let managed = self.routes.get(route_id).ok_or_else(|| {
693 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
694 })?;
695 if handle_is_running(&managed.consumer_handle)
696 || handle_is_running(&managed.pipeline_handle)
697 {
698 return Err(CamelError::RouteError(format!(
699 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
700 route_id,
701 inferred_lifecycle_label(managed)
702 )));
703 }
704 self.routes.remove(route_id);
705 info!(route_id = %route_id, "Route removed from controller");
706 Ok(())
707 }
708
709 pub fn route_count(&self) -> usize {
711 self.routes.len()
712 }
713
714 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
715 self.routes.get(route_id).map(|r| {
716 r.in_flight
717 .as_ref()
718 .map_or(0, |c| c.load(Ordering::Relaxed))
719 })
720 }
721
722 pub fn route_exists(&self, route_id: &str) -> bool {
724 self.routes.contains_key(route_id)
725 }
726
727 pub fn route_ids(&self) -> Vec<String> {
729 self.routes.keys().cloned().collect()
730 }
731
732 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
733 self.routes
734 .get(route_id)
735 .and_then(|m| m.definition.source_hash())
736 }
737
738 pub fn auto_startup_route_ids(&self) -> Vec<String> {
740 let mut pairs: Vec<(String, i32)> = self
741 .routes
742 .iter()
743 .filter(|(_, managed)| managed.definition.auto_startup())
744 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
745 .collect();
746 pairs.sort_by_key(|(_, order)| *order);
747 pairs.into_iter().map(|(id, _)| id).collect()
748 }
749
750 pub fn shutdown_route_ids(&self) -> Vec<String> {
752 let mut pairs: Vec<(String, i32)> = self
753 .routes
754 .iter()
755 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
756 .collect();
757 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
758 pairs.into_iter().map(|(id, _)| id).collect()
759 }
760
761 pub fn swap_pipeline(
766 &self,
767 route_id: &str,
768 new_pipeline: BoxProcessor,
769 ) -> Result<(), CamelError> {
770 let managed = self
771 .routes
772 .get(route_id)
773 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
774
775 if managed.aggregate_split.is_some() {
776 tracing::warn!(
777 route_id = %route_id,
778 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
779 );
780 }
781
782 managed
783 .pipeline
784 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
785 info!(route_id = %route_id, "Pipeline swapped atomically");
786 Ok(())
787 }
788
789 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
791 self.routes.get(route_id).map(|r| r.from_uri.clone())
792 }
793
794 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
799 self.routes
800 .get(route_id)
801 .map(|r| r.pipeline.load().0.clone())
802 }
803
804 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
806 super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
807 }
808
809 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
810 self.start_route(route_id).await
811 }
812
813 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
814 self.stop_route(route_id).await
815 }
816}
817
818#[async_trait::async_trait]
819impl RouteController for DefaultRouteController {
820 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
821 {
823 let managed = self
824 .routes
825 .get_mut(route_id)
826 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
827
828 let consumer_running = handle_is_running(&managed.consumer_handle);
829 let pipeline_running = handle_is_running(&managed.pipeline_handle);
830 if consumer_running && pipeline_running {
831 return Ok(());
832 }
833 if !consumer_running && pipeline_running {
834 return Err(CamelError::RouteError(format!(
835 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
836 route_id
837 )));
838 }
839 if consumer_running && !pipeline_running {
840 return Err(CamelError::RouteError(format!(
841 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
842 route_id
843 )));
844 }
845 }
846
847 info!(route_id = %route_id, "Starting route");
848
849 let (from_uri, pipeline, concurrency) = {
851 let managed = self
852 .routes
853 .get(route_id)
854 .expect("invariant: route must exist after prior existence check");
855 (
856 managed.from_uri.clone(),
857 Arc::clone(&managed.pipeline),
858 managed.concurrency.clone(),
859 )
860 };
861
862 let crash_notifier = self.crash_notifier.clone();
864 let runtime_for_consumer = self.runtime.clone();
865
866 let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
867 &self.registry,
868 &from_uri,
869 &ControllerComponentContext::new(
870 Arc::clone(&self.registry),
871 Arc::clone(&self.languages),
872 self.tracer_metrics
873 .clone()
874 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
875 Arc::clone(&self.platform_service),
876 ),
877 )?;
878
879 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
881
882 let managed = self
884 .routes
885 .get_mut(route_id)
886 .expect("invariant: route must exist after prior existence check");
887
888 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
890 let consumer_cancel = managed.consumer_cancel_token.child_token();
892 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
893 let tx_for_storage = tx.clone();
895 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
896
897 let managed = self
899 .routes
900 .get_mut(route_id)
901 .expect("invariant: route must exist after prior existence check");
902
903 if let Some(split) = managed.aggregate_split.as_ref() {
904 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
905
906 let route_cancel_clone = pipeline_cancel.clone();
907 let svc = AggregatorService::new(
908 split.agg_config.clone(),
909 late_tx,
910 Arc::clone(&self.languages),
911 route_cancel_clone,
912 );
913 let agg = Arc::new(std::sync::Mutex::new(svc));
914
915 managed.agg_service = Some(Arc::clone(&agg));
916
917 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
918 let pre_pipeline = Arc::clone(&split.pre_pipeline);
919 let post_pipeline = Arc::clone(&split.post_pipeline);
920
921 let consumer_handle = super::consumer_management::spawn_consumer_task(
923 route_id.to_string(),
924 consumer,
925 consumer_ctx,
926 crash_notifier,
927 runtime_for_consumer,
928 false,
929 );
930
931 let pipeline_handle = tokio::spawn(async move {
933 loop {
934 tokio::select! {
935 biased;
936
937 late_ex = async {
938 let mut rx = late_rx.lock().await;
939 rx.recv().await
940 } => {
941 match late_ex {
942 Some(ex) => {
943 let pipe = post_pipeline.load();
944 if let Err(e) = pipe.0.clone().oneshot(ex).await {
945 tracing::warn!(error = %e, "late exchange post-pipeline failed");
946 }
947 }
948 None => return,
949 }
950 }
951
952 envelope_opt = rx.recv() => {
953 match envelope_opt {
954 Some(envelope) => {
955 let ExchangeEnvelope { exchange, reply_tx } = envelope;
956 let pre_pipe = pre_pipeline.load();
957 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
958 Ok(ex) => ex,
959 Err(e) => {
960 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
961 continue;
962 }
963 };
964
965 let ex = {
966 let cloned_svc = agg
967 .lock()
968 .expect("mutex poisoned: another thread panicked while holding this lock")
969 .clone();
970 cloned_svc.oneshot(ex).await
971 };
972
973 match ex {
974 Ok(ex) => {
975 if !is_pending(&ex) {
976 let post_pipe = post_pipeline.load();
977 let out = post_pipe.0.clone().oneshot(ex).await;
978 if let Some(tx) = reply_tx { let _ = tx.send(out); }
979 } else if let Some(tx) = reply_tx {
980 let _ = tx.send(Ok(ex));
981 }
982 }
983 Err(e) => {
984 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
985 }
986 }
987 }
988 None => return,
989 }
990 }
991
992 _ = pipeline_cancel.cancelled() => {
993 {
994 let guard = agg
995 .lock()
996 .expect("mutex poisoned: another thread panicked while holding this lock");
997 guard.force_complete_all();
998 }
999 let mut rx_guard = late_rx.lock().await;
1000 while let Ok(late_ex) = rx_guard.try_recv() {
1001 let pipe = post_pipeline.load();
1002 let _ = pipe.0.clone().oneshot(late_ex).await;
1003 }
1004 break;
1005 }
1006 }
1007 }
1008 });
1009
1010 let managed = self
1011 .routes
1012 .get_mut(route_id)
1013 .expect("invariant: route must exist");
1014 managed.consumer_handle = Some(consumer_handle);
1015 managed.pipeline_handle = Some(pipeline_handle);
1016 managed.channel_sender = Some(tx_for_storage);
1017
1018 info!(route_id = %route_id, "Route started (aggregate with timeout)");
1019 return Ok(());
1020 }
1021 let consumer_handle = super::consumer_management::spawn_consumer_task(
1025 route_id.to_string(),
1026 consumer,
1027 consumer_ctx,
1028 crash_notifier,
1029 runtime_for_consumer,
1030 false,
1031 );
1032
1033 let pipeline_handle = match effective_concurrency {
1035 ConcurrencyModel::Sequential => {
1036 tokio::spawn(async move {
1037 loop {
1038 let envelope = tokio::select! {
1040 envelope = rx.recv() => match envelope {
1041 Some(e) => e,
1042 None => return, },
1044 _ = pipeline_cancel.cancelled() => {
1045 return;
1047 }
1048 };
1049 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1050
1051 let mut pipeline = pipeline.load().0.clone();
1053
1054 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1055 if let Some(tx) = reply_tx {
1056 let _ = tx.send(Err(e));
1057 }
1058 return;
1059 }
1060
1061 let result = pipeline.call(exchange).await;
1062 if let Some(tx) = reply_tx {
1063 let _ = tx.send(result);
1064 } else if let Err(ref e) = result
1065 && !matches!(e, CamelError::Stopped)
1066 {
1067 error!("Pipeline error: {e}");
1068 }
1069 }
1070 })
1071 }
1072 ConcurrencyModel::Concurrent { max } => {
1073 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1074 tokio::spawn(async move {
1075 loop {
1076 let envelope = tokio::select! {
1078 envelope = rx.recv() => match envelope {
1079 Some(e) => e,
1080 None => return, },
1082 _ = pipeline_cancel.cancelled() => {
1083 return;
1085 }
1086 };
1087 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1088 let pipe_ref = Arc::clone(&pipeline);
1089 let sem = sem.clone();
1090 let cancel = pipeline_cancel.clone();
1091 tokio::spawn(async move {
1092 let _permit = match &sem {
1094 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1095 None => None,
1096 };
1097
1098 let mut pipe = pipe_ref.load().0.clone();
1100
1101 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1103 if let Some(tx) = reply_tx {
1104 let _ = tx.send(Err(e));
1105 }
1106 return;
1107 }
1108
1109 let result = pipe.call(exchange).await;
1110 if let Some(tx) = reply_tx {
1111 let _ = tx.send(result);
1112 } else if let Err(ref e) = result
1113 && !matches!(e, CamelError::Stopped)
1114 {
1115 error!("Pipeline error: {e}");
1116 }
1117 });
1118 }
1119 })
1120 }
1121 };
1122
1123 let managed = self
1125 .routes
1126 .get_mut(route_id)
1127 .expect("invariant: route must exist after prior existence check");
1128 managed.consumer_handle = Some(consumer_handle);
1129 managed.pipeline_handle = Some(pipeline_handle);
1130 managed.channel_sender = Some(tx_for_storage);
1131
1132 info!(route_id = %route_id, "Route started");
1133 Ok(())
1134 }
1135
1136 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1137 self.stop_route_internal(route_id).await
1138 }
1139
1140 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1141 self.stop_route(route_id).await?;
1142 tokio::time::sleep(Duration::from_millis(100)).await;
1143 self.start_route(route_id).await
1144 }
1145
1146 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1147 let managed = self
1149 .routes
1150 .get_mut(route_id)
1151 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1152
1153 let consumer_running = handle_is_running(&managed.consumer_handle);
1154 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1155
1156 if !consumer_running || !pipeline_running {
1158 return Err(CamelError::RouteError(format!(
1159 "Cannot suspend route '{}' with execution lifecycle {}",
1160 route_id,
1161 inferred_lifecycle_label(managed)
1162 )));
1163 }
1164
1165 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1166
1167 let managed = self
1169 .routes
1170 .get_mut(route_id)
1171 .expect("invariant: route must exist after prior existence check");
1172 managed.consumer_cancel_token.cancel();
1173
1174 let managed = self
1176 .routes
1177 .get_mut(route_id)
1178 .expect("invariant: route must exist after prior existence check");
1179 let consumer_handle = managed.consumer_handle.take();
1180
1181 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1183 if let Some(handle) = consumer_handle {
1184 let _ = handle.await;
1185 }
1186 })
1187 .await;
1188
1189 if timeout_result.is_err() {
1190 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1191 }
1192
1193 let managed = self
1195 .routes
1196 .get_mut(route_id)
1197 .expect("invariant: route must exist after prior existence check");
1198
1199 managed.consumer_cancel_token = CancellationToken::new();
1201
1202 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1203 Ok(())
1204 }
1205
1206 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1207 let managed = self
1209 .routes
1210 .get(route_id)
1211 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1212
1213 let consumer_running = handle_is_running(&managed.consumer_handle);
1214 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1215 if consumer_running || !pipeline_running {
1216 return Err(CamelError::RouteError(format!(
1217 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1218 route_id,
1219 inferred_lifecycle_label(managed)
1220 )));
1221 }
1222
1223 let sender = managed.channel_sender.clone().ok_or_else(|| {
1225 CamelError::RouteError("Suspended route has no channel sender".into())
1226 })?;
1227
1228 let from_uri = managed.from_uri.clone();
1230
1231 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1232
1233 let (consumer, _) = super::consumer_management::create_route_consumer(
1234 &self.registry,
1235 &from_uri,
1236 &ControllerComponentContext::new(
1237 Arc::clone(&self.registry),
1238 Arc::clone(&self.languages),
1239 self.tracer_metrics
1240 .clone()
1241 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1242 Arc::clone(&self.platform_service),
1243 ),
1244 )?;
1245
1246 let managed = self
1248 .routes
1249 .get_mut(route_id)
1250 .expect("invariant: route must exist after prior existence check");
1251
1252 let consumer_cancel = managed.consumer_cancel_token.child_token();
1254
1255 let crash_notifier = self.crash_notifier.clone();
1256 let runtime_for_consumer = self.runtime.clone();
1257
1258 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1260
1261 let consumer_handle = super::consumer_management::spawn_consumer_task(
1263 route_id.to_string(),
1264 consumer,
1265 consumer_ctx,
1266 crash_notifier,
1267 runtime_for_consumer,
1268 true,
1269 );
1270
1271 let managed = self
1273 .routes
1274 .get_mut(route_id)
1275 .expect("invariant: route must exist after prior existence check");
1276 managed.consumer_handle = Some(consumer_handle);
1277
1278 info!(route_id = %route_id, "Route resumed");
1279 Ok(())
1280 }
1281
1282 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1283 let route_ids: Vec<String> = {
1286 let mut pairs: Vec<_> = self
1287 .routes
1288 .iter()
1289 .filter(|(_, r)| r.definition.auto_startup())
1290 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1291 .collect();
1292 pairs.sort_by_key(|(_, order)| *order);
1293 pairs.into_iter().map(|(id, _)| id).collect()
1294 };
1295
1296 info!("Starting {} auto-startup routes", route_ids.len());
1297
1298 let mut errors: Vec<String> = Vec::new();
1300 for route_id in route_ids {
1301 if let Err(e) = self.start_route(&route_id).await {
1302 errors.push(format!("Route '{}': {}", route_id, e));
1303 }
1304 }
1305
1306 if !errors.is_empty() {
1307 return Err(CamelError::RouteError(format!(
1308 "Failed to start routes: {}",
1309 errors.join(", ")
1310 )));
1311 }
1312
1313 info!("All auto-startup routes started");
1314 Ok(())
1315 }
1316
1317 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1318 let route_ids: Vec<String> = {
1320 let mut pairs: Vec<_> = self
1321 .routes
1322 .iter()
1323 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1324 .collect();
1325 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1326 pairs.into_iter().map(|(id, _)| id).collect()
1327 };
1328
1329 info!("Stopping {} routes", route_ids.len());
1330
1331 for route_id in route_ids {
1332 let _ = self.stop_route(&route_id).await;
1333 }
1334
1335 info!("All routes stopped");
1336 Ok(())
1337 }
1338}
1339
1340#[cfg(test)]
1341#[path = "route_controller_tests.rs"]
1342mod tests;