1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21 BoxProcessor, CamelError, Exchange, IdentityProcessor, LeaderElector, NoOpMetrics,
22 NoopLeaderElector, ProducerContext, RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25 ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35 compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38 BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45#[derive(Debug, Clone)]
47pub struct CrashNotification {
48 pub route_id: String,
50 pub error: String,
52}
53
54pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73pub(super) struct AggregateSplitInfo {
75 pub(super) pre_pipeline: SharedPipeline,
76 pub(super) agg_config: AggregatorConfig,
77 pub(super) post_pipeline: SharedPipeline,
78}
79
80pub(super) struct ManagedRoute {
81 pub(super) definition: RouteDefinitionInfo,
83 pub(super) from_uri: String,
85 pub(super) pipeline: SharedPipeline,
87 pub(super) concurrency: Option<ConcurrencyModel>,
89 pub(super) consumer_handle: Option<JoinHandle<()>>,
91 pub(super) pipeline_handle: Option<JoinHandle<()>>,
93 pub(super) consumer_cancel_token: CancellationToken,
96 pub(super) pipeline_cancel_token: CancellationToken,
99 pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
102 pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
104 pub(super) aggregate_split: Option<AggregateSplitInfo>,
105 pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
106}
107
108pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
109 handle.as_ref().is_some_and(|h| !h.is_finished())
110}
111
112fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
113 match (
114 handle_is_running(&managed.consumer_handle),
115 handle_is_running(&managed.pipeline_handle),
116 ) {
117 (true, true) => "Started",
118 (false, true) => "Suspended",
119 (true, false) => "Stopping",
120 (false, false) => "Stopped",
121 }
122}
123
124fn find_top_level_aggregate_with_timeout(
125 steps: &[BuilderStep],
126) -> Option<(usize, AggregatorConfig)> {
127 for (i, step) in steps.iter().enumerate() {
128 if let BuilderStep::Aggregate { config } = step {
129 if has_timeout_condition(&config.completion) {
130 return Some((i, config.clone()));
131 }
132 break;
133 }
134 }
135 None
136}
137
138pub(crate) struct ControllerComponentContext {
139 registry: Arc<std::sync::Mutex<Registry>>,
140 languages: SharedLanguageRegistry,
141 metrics: Arc<dyn MetricsCollector>,
142 leader_elector: Arc<dyn LeaderElector>,
143}
144
145impl ControllerComponentContext {
146 pub(crate) fn new(
147 registry: Arc<std::sync::Mutex<Registry>>,
148 languages: SharedLanguageRegistry,
149 metrics: Arc<dyn MetricsCollector>,
150 leader_elector: Arc<dyn LeaderElector>,
151 ) -> Self {
152 Self {
153 registry,
154 languages,
155 metrics,
156 leader_elector,
157 }
158 }
159}
160
161impl ComponentContext for ControllerComponentContext {
162 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
163 self.registry.lock().ok()?.get(scheme)
164 }
165
166 fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
167 self.languages.lock().ok()?.get(name).cloned()
168 }
169
170 fn metrics(&self) -> Arc<dyn MetricsCollector> {
171 Arc::clone(&self.metrics)
172 }
173
174 fn leader_elector(&self) -> Arc<dyn LeaderElector> {
175 Arc::clone(&self.leader_elector)
176 }
177}
178
179fn is_pending(ex: &Exchange) -> bool {
180 ex.property("CamelAggregatorPending")
181 .and_then(|v| v.as_bool())
182 .unwrap_or(false)
183}
184
185async fn ready_with_backoff(
192 pipeline: &mut BoxProcessor,
193 cancel: &CancellationToken,
194) -> Result<(), CamelError> {
195 loop {
196 match pipeline.ready().await {
197 Ok(_) => return Ok(()),
198 Err(CamelError::CircuitOpen(ref msg)) => {
199 warn!("Circuit open, backing off: {msg}");
200 tokio::select! {
201 _ = tokio::time::sleep(Duration::from_secs(1)) => {
202 continue;
203 }
204 _ = cancel.cancelled() => {
205 return Err(CamelError::CircuitOpen(msg.clone()));
207 }
208 }
209 }
210 Err(e) => {
211 error!("Pipeline not ready: {e}");
212 return Err(e);
213 }
214 }
215 }
216}
217
218fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
219 let stamp = std::time::SystemTime::now()
220 .duration_since(std::time::UNIX_EPOCH)
221 .unwrap_or_default()
222 .as_nanos();
223 RuntimeCommand::FailRoute {
224 route_id: route_id.to_string(),
225 error: error.to_string(),
226 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
227 causation_id: None,
228 }
229}
230
231pub(super) async fn publish_runtime_failure(
232 runtime: Option<Weak<dyn RuntimeHandle>>,
233 route_id: &str,
234 error: &str,
235) {
236 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
237 return;
238 };
239 let command = runtime_failure_command(route_id, error);
240 if let Err(runtime_error) = runtime.execute(command).await {
241 warn!(
242 route_id = %route_id,
243 error = %runtime_error,
244 "failed to synchronize route crash with runtime projection"
245 );
246 }
247}
248
249pub struct DefaultRouteController {
257 routes: HashMap<String, ManagedRoute>,
259 registry: Arc<std::sync::Mutex<Registry>>,
261 languages: SharedLanguageRegistry,
263 beans: Arc<std::sync::Mutex<BeanRegistry>>,
265 runtime: Option<Weak<dyn RuntimeHandle>>,
267 global_error_handler: Option<ErrorHandlerConfig>,
269 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
271 tracing_enabled: bool,
273 tracer_detail_level: DetailLevel,
275 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
277 leader_elector: Arc<dyn LeaderElector>,
278}
279
280impl DefaultRouteController {
281 pub fn new(
283 registry: Arc<std::sync::Mutex<Registry>>,
284 leader_elector: Arc<dyn LeaderElector>,
285 ) -> Self {
286 Self::with_beans_and_leader(
287 registry,
288 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
289 leader_elector,
290 )
291 }
292
293 pub fn with_beans(
295 registry: Arc<std::sync::Mutex<Registry>>,
296 beans: Arc<std::sync::Mutex<BeanRegistry>>,
297 ) -> Self {
298 Self::with_beans_and_leader(registry, beans, Arc::new(NoopLeaderElector))
299 }
300
301 fn with_beans_and_leader(
302 registry: Arc<std::sync::Mutex<Registry>>,
303 beans: Arc<std::sync::Mutex<BeanRegistry>>,
304 leader_elector: Arc<dyn LeaderElector>,
305 ) -> Self {
306 Self {
307 routes: HashMap::new(),
308 registry,
309 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
310 beans,
311 runtime: None,
312 global_error_handler: None,
313 crash_notifier: None,
314 tracing_enabled: false,
315 tracer_detail_level: DetailLevel::Minimal,
316 tracer_metrics: None,
317 leader_elector,
318 }
319 }
320
321 pub fn with_languages(
323 registry: Arc<std::sync::Mutex<Registry>>,
324 languages: SharedLanguageRegistry,
325 leader_elector: Arc<dyn LeaderElector>,
326 ) -> Self {
327 Self {
328 routes: HashMap::new(),
329 registry,
330 languages,
331 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
332 runtime: None,
333 global_error_handler: None,
334 crash_notifier: None,
335 tracing_enabled: false,
336 tracer_detail_level: DetailLevel::Minimal,
337 tracer_metrics: None,
338 leader_elector,
339 }
340 }
341
342 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
344 self.runtime = Some(Arc::downgrade(&runtime));
345 }
346
347 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
352 self.crash_notifier = Some(tx);
353 }
354
355 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
357 self.global_error_handler = Some(config);
358 }
359
360 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
362 self.tracing_enabled = config.enabled;
363 self.tracer_detail_level = config.detail_level.clone();
364 self.tracer_metrics = config.metrics_collector.clone();
365 }
366
367 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
368 let mut producer_ctx = ProducerContext::new();
369 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
370 producer_ctx = producer_ctx.with_runtime(runtime);
371 }
372 Ok(producer_ctx)
373 }
374
375 fn resolve_error_handler(
377 &self,
378 config: ErrorHandlerConfig,
379 producer_ctx: &ProducerContext,
380 component_ctx: &dyn ComponentContext,
381 ) -> Result<ErrorHandlerLayer, CamelError> {
382 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
384 let parsed = parse_uri(uri)?;
385 let component = component_ctx
386 .resolve_component(&parsed.scheme)
387 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
388 let endpoint = component.create_endpoint(uri, component_ctx)?;
389 Some(endpoint.create_producer(producer_ctx)?)
390 } else {
391 None
392 };
393
394 let mut resolved_policies = Vec::new();
396 for policy in config.policies {
397 let handler_producer = if let Some(ref uri) = policy.handled_by {
398 let parsed = parse_uri(uri)?;
399 let component = component_ctx
400 .resolve_component(&parsed.scheme)
401 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
402 let endpoint = component.create_endpoint(uri, component_ctx)?;
403 Some(endpoint.create_producer(producer_ctx)?)
404 } else {
405 None
406 };
407 resolved_policies.push((policy, handler_producer));
408 }
409
410 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
411 }
412
413 fn resolve_uow_layer(
416 &self,
417 config: &UnitOfWorkConfig,
418 producer_ctx: &ProducerContext,
419 component_ctx: &dyn ComponentContext,
420 counter: Option<Arc<AtomicU64>>,
421 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
422 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
423 let parsed = parse_uri(uri)?;
424 let component = component_ctx
425 .resolve_component(&parsed.scheme)
426 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
427 let endpoint = component.create_endpoint(uri, component_ctx)?;
428 endpoint.create_producer(producer_ctx).map_err(|e| {
429 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
430 })
431 };
432
433 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
434 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
435
436 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
437 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
438 Ok((layer, counter))
439 }
440
441 pub(crate) fn resolve_steps(
443 &self,
444 steps: Vec<BuilderStep>,
445 producer_ctx: &ProducerContext,
446 registry: &Arc<std::sync::Mutex<Registry>>,
447 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
448 let component_ctx = Arc::new(ControllerComponentContext::new(
449 Arc::clone(registry),
450 Arc::clone(&self.languages),
451 self.tracer_metrics
452 .clone()
453 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
454 Arc::clone(&self.leader_elector),
455 ));
456
457 super::step_resolution::resolve_steps(
458 steps,
459 producer_ctx,
460 registry,
461 &self.languages,
462 &self.beans,
463 component_ctx,
464 )
465 }
466
467 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
477 let route_id = definition.route_id().to_string();
478
479 if self.routes.contains_key(&route_id) {
480 return Err(CamelError::RouteError(format!(
481 "Route '{}' already exists",
482 route_id
483 )));
484 }
485
486 info!(route_id = %route_id, "Adding route to controller");
487
488 let definition_info = definition.to_info();
490 let RouteDefinition {
491 from_uri,
492 steps,
493 error_handler,
494 circuit_breaker,
495 unit_of_work,
496 concurrency,
497 ..
498 } = definition;
499
500 let producer_ctx = self.build_producer_context()?;
502
503 let mut aggregate_split: Option<AggregateSplitInfo> = None;
505 let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
506 Some((idx, agg_config)) => {
507 let mut pre_steps = steps;
508 let mut rest = pre_steps.split_off(idx);
509 let _agg_step = rest.remove(0);
510 let post_steps = rest;
511
512 let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
513 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
514 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
515 compose_pipeline(pre_procs),
516 )));
517
518 let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
519 let post_procs: Vec<BoxProcessor> =
520 post_pairs.into_iter().map(|(p, _)| p).collect();
521 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
522 compose_pipeline(post_procs),
523 )));
524
525 aggregate_split = Some(AggregateSplitInfo {
526 pre_pipeline,
527 agg_config,
528 post_pipeline,
529 });
530
531 vec![]
532 }
533 None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
534 };
535 let route_id_for_tracing = route_id.clone();
536 let mut pipeline = if processors_with_contracts.is_empty() {
537 BoxProcessor::new(IdentityProcessor)
538 } else {
539 compose_traced_pipeline_with_contracts(
540 processors_with_contracts,
541 &route_id_for_tracing,
542 self.tracing_enabled,
543 self.tracer_detail_level.clone(),
544 self.tracer_metrics.clone(),
545 )
546 };
547
548 if let Some(cb_config) = circuit_breaker {
550 let cb_layer = CircuitBreakerLayer::new(cb_config);
551 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
552 }
553
554 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
556
557 if let Some(config) = eh_config {
558 let component_ctx = ControllerComponentContext::new(
559 Arc::clone(&self.registry),
560 Arc::clone(&self.languages),
561 self.tracer_metrics
562 .clone()
563 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
564 Arc::clone(&self.leader_elector),
565 );
566 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
567 pipeline = BoxProcessor::new(layer.layer(pipeline));
568 }
569
570 let uow_counter = if let Some(uow_config) = &unit_of_work {
572 let component_ctx = ControllerComponentContext::new(
573 Arc::clone(&self.registry),
574 Arc::clone(&self.languages),
575 self.tracer_metrics
576 .clone()
577 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
578 Arc::clone(&self.leader_elector),
579 );
580 let (uow_layer, counter) =
581 self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
582 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
583 Some(counter)
584 } else {
585 None
586 };
587
588 self.routes.insert(
589 route_id.clone(),
590 ManagedRoute {
591 definition: definition_info,
592 from_uri,
593 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
594 concurrency,
595 consumer_handle: None,
596 pipeline_handle: None,
597 consumer_cancel_token: CancellationToken::new(),
598 pipeline_cancel_token: CancellationToken::new(),
599 channel_sender: None,
600 in_flight: uow_counter,
601 aggregate_split,
602 agg_service: None,
603 },
604 );
605
606 Ok(())
607 }
608
609 pub fn compile_route_definition(
614 &self,
615 def: RouteDefinition,
616 ) -> Result<BoxProcessor, CamelError> {
617 let route_id = def.route_id().to_string();
618
619 let producer_ctx = self.build_producer_context()?;
620
621 let processors_with_contracts =
622 self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
623 let mut pipeline = compose_traced_pipeline_with_contracts(
624 processors_with_contracts,
625 &route_id,
626 self.tracing_enabled,
627 self.tracer_detail_level.clone(),
628 self.tracer_metrics.clone(),
629 );
630
631 if let Some(cb_config) = def.circuit_breaker {
632 let cb_layer = CircuitBreakerLayer::new(cb_config);
633 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
634 }
635
636 let eh_config = def
637 .error_handler
638 .clone()
639 .or_else(|| self.global_error_handler.clone());
640 if let Some(config) = eh_config {
641 let component_ctx = ControllerComponentContext::new(
642 Arc::clone(&self.registry),
643 Arc::clone(&self.languages),
644 self.tracer_metrics
645 .clone()
646 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
647 Arc::clone(&self.leader_elector),
648 );
649 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
650 pipeline = BoxProcessor::new(layer.layer(pipeline));
651 }
652
653 if let Some(uow_config) = &def.unit_of_work {
655 let existing_counter = self
656 .routes
657 .get(&route_id)
658 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
659
660 let component_ctx = ControllerComponentContext::new(
661 Arc::clone(&self.registry),
662 Arc::clone(&self.languages),
663 self.tracer_metrics
664 .clone()
665 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
666 Arc::clone(&self.leader_elector),
667 );
668
669 let (uow_layer, _counter) = self.resolve_uow_layer(
670 uow_config,
671 &producer_ctx,
672 &component_ctx,
673 existing_counter,
674 )?;
675
676 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
677 }
678
679 Ok(pipeline)
680 }
681
682 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
688 let managed = self.routes.get(route_id).ok_or_else(|| {
689 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
690 })?;
691 if handle_is_running(&managed.consumer_handle)
692 || handle_is_running(&managed.pipeline_handle)
693 {
694 return Err(CamelError::RouteError(format!(
695 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
696 route_id,
697 inferred_lifecycle_label(managed)
698 )));
699 }
700 self.routes.remove(route_id);
701 info!(route_id = %route_id, "Route removed from controller");
702 Ok(())
703 }
704
705 pub fn route_count(&self) -> usize {
707 self.routes.len()
708 }
709
710 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
711 self.routes.get(route_id).map(|r| {
712 r.in_flight
713 .as_ref()
714 .map_or(0, |c| c.load(Ordering::Relaxed))
715 })
716 }
717
718 pub fn route_exists(&self, route_id: &str) -> bool {
720 self.routes.contains_key(route_id)
721 }
722
723 pub fn route_ids(&self) -> Vec<String> {
725 self.routes.keys().cloned().collect()
726 }
727
728 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
729 self.routes
730 .get(route_id)
731 .and_then(|m| m.definition.source_hash())
732 }
733
734 pub fn auto_startup_route_ids(&self) -> Vec<String> {
736 let mut pairs: Vec<(String, i32)> = self
737 .routes
738 .iter()
739 .filter(|(_, managed)| managed.definition.auto_startup())
740 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
741 .collect();
742 pairs.sort_by_key(|(_, order)| *order);
743 pairs.into_iter().map(|(id, _)| id).collect()
744 }
745
746 pub fn shutdown_route_ids(&self) -> Vec<String> {
748 let mut pairs: Vec<(String, i32)> = self
749 .routes
750 .iter()
751 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
752 .collect();
753 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
754 pairs.into_iter().map(|(id, _)| id).collect()
755 }
756
757 pub fn swap_pipeline(
762 &self,
763 route_id: &str,
764 new_pipeline: BoxProcessor,
765 ) -> Result<(), CamelError> {
766 let managed = self
767 .routes
768 .get(route_id)
769 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
770
771 if managed.aggregate_split.is_some() {
772 tracing::warn!(
773 route_id = %route_id,
774 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
775 );
776 }
777
778 managed
779 .pipeline
780 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
781 info!(route_id = %route_id, "Pipeline swapped atomically");
782 Ok(())
783 }
784
785 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
787 self.routes.get(route_id).map(|r| r.from_uri.clone())
788 }
789
790 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
795 self.routes
796 .get(route_id)
797 .map(|r| r.pipeline.load().0.clone())
798 }
799
800 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
802 super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
803 }
804
805 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
806 self.start_route(route_id).await
807 }
808
809 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
810 self.stop_route(route_id).await
811 }
812}
813
814#[async_trait::async_trait]
815impl RouteController for DefaultRouteController {
816 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
817 {
819 let managed = self
820 .routes
821 .get_mut(route_id)
822 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
823
824 let consumer_running = handle_is_running(&managed.consumer_handle);
825 let pipeline_running = handle_is_running(&managed.pipeline_handle);
826 if consumer_running && pipeline_running {
827 return Ok(());
828 }
829 if !consumer_running && pipeline_running {
830 return Err(CamelError::RouteError(format!(
831 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
832 route_id
833 )));
834 }
835 if consumer_running && !pipeline_running {
836 return Err(CamelError::RouteError(format!(
837 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
838 route_id
839 )));
840 }
841 }
842
843 info!(route_id = %route_id, "Starting route");
844
845 let (from_uri, pipeline, concurrency) = {
847 let managed = self
848 .routes
849 .get(route_id)
850 .expect("invariant: route must exist after prior existence check");
851 (
852 managed.from_uri.clone(),
853 Arc::clone(&managed.pipeline),
854 managed.concurrency.clone(),
855 )
856 };
857
858 let crash_notifier = self.crash_notifier.clone();
860 let runtime_for_consumer = self.runtime.clone();
861
862 let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
863 &self.registry,
864 &from_uri,
865 &ControllerComponentContext::new(
866 Arc::clone(&self.registry),
867 Arc::clone(&self.languages),
868 self.tracer_metrics
869 .clone()
870 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
871 Arc::clone(&self.leader_elector),
872 ),
873 )?;
874
875 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
877
878 let managed = self
880 .routes
881 .get_mut(route_id)
882 .expect("invariant: route must exist after prior existence check");
883
884 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
886 let consumer_cancel = managed.consumer_cancel_token.child_token();
888 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
889 let tx_for_storage = tx.clone();
891 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
892
893 let managed = self
895 .routes
896 .get_mut(route_id)
897 .expect("invariant: route must exist after prior existence check");
898
899 if let Some(split) = managed.aggregate_split.as_ref() {
900 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
901
902 let route_cancel_clone = pipeline_cancel.clone();
903 let svc = AggregatorService::new(
904 split.agg_config.clone(),
905 late_tx,
906 Arc::clone(&self.languages),
907 route_cancel_clone,
908 );
909 let agg = Arc::new(std::sync::Mutex::new(svc));
910
911 managed.agg_service = Some(Arc::clone(&agg));
912
913 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
914 let pre_pipeline = Arc::clone(&split.pre_pipeline);
915 let post_pipeline = Arc::clone(&split.post_pipeline);
916
917 let consumer_handle = super::consumer_management::spawn_consumer_task(
919 route_id.to_string(),
920 consumer,
921 consumer_ctx,
922 crash_notifier,
923 runtime_for_consumer,
924 false,
925 );
926
927 let pipeline_handle = tokio::spawn(async move {
929 loop {
930 tokio::select! {
931 biased;
932
933 late_ex = async {
934 let mut rx = late_rx.lock().await;
935 rx.recv().await
936 } => {
937 match late_ex {
938 Some(ex) => {
939 let pipe = post_pipeline.load();
940 if let Err(e) = pipe.0.clone().oneshot(ex).await {
941 tracing::warn!(error = %e, "late exchange post-pipeline failed");
942 }
943 }
944 None => return,
945 }
946 }
947
948 envelope_opt = rx.recv() => {
949 match envelope_opt {
950 Some(envelope) => {
951 let ExchangeEnvelope { exchange, reply_tx } = envelope;
952 let pre_pipe = pre_pipeline.load();
953 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
954 Ok(ex) => ex,
955 Err(e) => {
956 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
957 continue;
958 }
959 };
960
961 let ex = {
962 let cloned_svc = agg
963 .lock()
964 .expect("mutex poisoned: another thread panicked while holding this lock")
965 .clone();
966 cloned_svc.oneshot(ex).await
967 };
968
969 match ex {
970 Ok(ex) => {
971 if !is_pending(&ex) {
972 let post_pipe = post_pipeline.load();
973 let out = post_pipe.0.clone().oneshot(ex).await;
974 if let Some(tx) = reply_tx { let _ = tx.send(out); }
975 } else if let Some(tx) = reply_tx {
976 let _ = tx.send(Ok(ex));
977 }
978 }
979 Err(e) => {
980 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
981 }
982 }
983 }
984 None => return,
985 }
986 }
987
988 _ = pipeline_cancel.cancelled() => {
989 {
990 let guard = agg
991 .lock()
992 .expect("mutex poisoned: another thread panicked while holding this lock");
993 guard.force_complete_all();
994 }
995 let mut rx_guard = late_rx.lock().await;
996 while let Ok(late_ex) = rx_guard.try_recv() {
997 let pipe = post_pipeline.load();
998 let _ = pipe.0.clone().oneshot(late_ex).await;
999 }
1000 break;
1001 }
1002 }
1003 }
1004 });
1005
1006 let managed = self
1007 .routes
1008 .get_mut(route_id)
1009 .expect("invariant: route must exist");
1010 managed.consumer_handle = Some(consumer_handle);
1011 managed.pipeline_handle = Some(pipeline_handle);
1012 managed.channel_sender = Some(tx_for_storage);
1013
1014 info!(route_id = %route_id, "Route started (aggregate with timeout)");
1015 return Ok(());
1016 }
1017 let consumer_handle = super::consumer_management::spawn_consumer_task(
1021 route_id.to_string(),
1022 consumer,
1023 consumer_ctx,
1024 crash_notifier,
1025 runtime_for_consumer,
1026 false,
1027 );
1028
1029 let pipeline_handle = match effective_concurrency {
1031 ConcurrencyModel::Sequential => {
1032 tokio::spawn(async move {
1033 loop {
1034 let envelope = tokio::select! {
1036 envelope = rx.recv() => match envelope {
1037 Some(e) => e,
1038 None => return, },
1040 _ = pipeline_cancel.cancelled() => {
1041 return;
1043 }
1044 };
1045 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1046
1047 let mut pipeline = pipeline.load().0.clone();
1049
1050 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1051 if let Some(tx) = reply_tx {
1052 let _ = tx.send(Err(e));
1053 }
1054 return;
1055 }
1056
1057 let result = pipeline.call(exchange).await;
1058 if let Some(tx) = reply_tx {
1059 let _ = tx.send(result);
1060 } else if let Err(ref e) = result
1061 && !matches!(e, CamelError::Stopped)
1062 {
1063 error!("Pipeline error: {e}");
1064 }
1065 }
1066 })
1067 }
1068 ConcurrencyModel::Concurrent { max } => {
1069 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1070 tokio::spawn(async move {
1071 loop {
1072 let envelope = tokio::select! {
1074 envelope = rx.recv() => match envelope {
1075 Some(e) => e,
1076 None => return, },
1078 _ = pipeline_cancel.cancelled() => {
1079 return;
1081 }
1082 };
1083 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1084 let pipe_ref = Arc::clone(&pipeline);
1085 let sem = sem.clone();
1086 let cancel = pipeline_cancel.clone();
1087 tokio::spawn(async move {
1088 let _permit = match &sem {
1090 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1091 None => None,
1092 };
1093
1094 let mut pipe = pipe_ref.load().0.clone();
1096
1097 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1099 if let Some(tx) = reply_tx {
1100 let _ = tx.send(Err(e));
1101 }
1102 return;
1103 }
1104
1105 let result = pipe.call(exchange).await;
1106 if let Some(tx) = reply_tx {
1107 let _ = tx.send(result);
1108 } else if let Err(ref e) = result
1109 && !matches!(e, CamelError::Stopped)
1110 {
1111 error!("Pipeline error: {e}");
1112 }
1113 });
1114 }
1115 })
1116 }
1117 };
1118
1119 let managed = self
1121 .routes
1122 .get_mut(route_id)
1123 .expect("invariant: route must exist after prior existence check");
1124 managed.consumer_handle = Some(consumer_handle);
1125 managed.pipeline_handle = Some(pipeline_handle);
1126 managed.channel_sender = Some(tx_for_storage);
1127
1128 info!(route_id = %route_id, "Route started");
1129 Ok(())
1130 }
1131
1132 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1133 self.stop_route_internal(route_id).await
1134 }
1135
1136 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1137 self.stop_route(route_id).await?;
1138 tokio::time::sleep(Duration::from_millis(100)).await;
1139 self.start_route(route_id).await
1140 }
1141
1142 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1143 let managed = self
1145 .routes
1146 .get_mut(route_id)
1147 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1148
1149 let consumer_running = handle_is_running(&managed.consumer_handle);
1150 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1151
1152 if !consumer_running || !pipeline_running {
1154 return Err(CamelError::RouteError(format!(
1155 "Cannot suspend route '{}' with execution lifecycle {}",
1156 route_id,
1157 inferred_lifecycle_label(managed)
1158 )));
1159 }
1160
1161 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1162
1163 let managed = self
1165 .routes
1166 .get_mut(route_id)
1167 .expect("invariant: route must exist after prior existence check");
1168 managed.consumer_cancel_token.cancel();
1169
1170 let managed = self
1172 .routes
1173 .get_mut(route_id)
1174 .expect("invariant: route must exist after prior existence check");
1175 let consumer_handle = managed.consumer_handle.take();
1176
1177 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1179 if let Some(handle) = consumer_handle {
1180 let _ = handle.await;
1181 }
1182 })
1183 .await;
1184
1185 if timeout_result.is_err() {
1186 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1187 }
1188
1189 let managed = self
1191 .routes
1192 .get_mut(route_id)
1193 .expect("invariant: route must exist after prior existence check");
1194
1195 managed.consumer_cancel_token = CancellationToken::new();
1197
1198 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1199 Ok(())
1200 }
1201
1202 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1203 let managed = self
1205 .routes
1206 .get(route_id)
1207 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1208
1209 let consumer_running = handle_is_running(&managed.consumer_handle);
1210 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1211 if consumer_running || !pipeline_running {
1212 return Err(CamelError::RouteError(format!(
1213 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1214 route_id,
1215 inferred_lifecycle_label(managed)
1216 )));
1217 }
1218
1219 let sender = managed.channel_sender.clone().ok_or_else(|| {
1221 CamelError::RouteError("Suspended route has no channel sender".into())
1222 })?;
1223
1224 let from_uri = managed.from_uri.clone();
1226
1227 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1228
1229 let (consumer, _) = super::consumer_management::create_route_consumer(
1230 &self.registry,
1231 &from_uri,
1232 &ControllerComponentContext::new(
1233 Arc::clone(&self.registry),
1234 Arc::clone(&self.languages),
1235 self.tracer_metrics
1236 .clone()
1237 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1238 Arc::clone(&self.leader_elector),
1239 ),
1240 )?;
1241
1242 let managed = self
1244 .routes
1245 .get_mut(route_id)
1246 .expect("invariant: route must exist after prior existence check");
1247
1248 let consumer_cancel = managed.consumer_cancel_token.child_token();
1250
1251 let crash_notifier = self.crash_notifier.clone();
1252 let runtime_for_consumer = self.runtime.clone();
1253
1254 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1256
1257 let consumer_handle = super::consumer_management::spawn_consumer_task(
1259 route_id.to_string(),
1260 consumer,
1261 consumer_ctx,
1262 crash_notifier,
1263 runtime_for_consumer,
1264 true,
1265 );
1266
1267 let managed = self
1269 .routes
1270 .get_mut(route_id)
1271 .expect("invariant: route must exist after prior existence check");
1272 managed.consumer_handle = Some(consumer_handle);
1273
1274 info!(route_id = %route_id, "Route resumed");
1275 Ok(())
1276 }
1277
1278 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1279 let route_ids: Vec<String> = {
1282 let mut pairs: Vec<_> = self
1283 .routes
1284 .iter()
1285 .filter(|(_, r)| r.definition.auto_startup())
1286 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1287 .collect();
1288 pairs.sort_by_key(|(_, order)| *order);
1289 pairs.into_iter().map(|(id, _)| id).collect()
1290 };
1291
1292 info!("Starting {} auto-startup routes", route_ids.len());
1293
1294 let mut errors: Vec<String> = Vec::new();
1296 for route_id in route_ids {
1297 if let Err(e) = self.start_route(&route_id).await {
1298 errors.push(format!("Route '{}': {}", route_id, e));
1299 }
1300 }
1301
1302 if !errors.is_empty() {
1303 return Err(CamelError::RouteError(format!(
1304 "Failed to start routes: {}",
1305 errors.join(", ")
1306 )));
1307 }
1308
1309 info!("All auto-startup routes started");
1310 Ok(())
1311 }
1312
1313 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1314 let route_ids: Vec<String> = {
1316 let mut pairs: Vec<_> = self
1317 .routes
1318 .iter()
1319 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1320 .collect();
1321 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1322 pairs.into_iter().map(|(id, _)| id).collect()
1323 };
1324
1325 info!("Stopping {} routes", route_ids.len());
1326
1327 for route_id in route_ids {
1328 let _ = self.stop_route(&route_id).await;
1329 }
1330
1331 info!("All routes stopped");
1332 Ok(())
1333 }
1334}
1335
1336#[cfg(test)]
1337#[path = "route_controller_tests.rs"]
1338mod tests;