1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21 BoxProcessor, CamelError, Exchange, IdentityProcessor, NoOpMetrics, ProducerContext,
22 RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25 ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35 compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38 BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45#[derive(Debug, Clone)]
47pub struct CrashNotification {
48 pub route_id: String,
50 pub error: String,
52}
53
54pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73pub(super) struct AggregateSplitInfo {
75 pub(super) pre_pipeline: SharedPipeline,
76 pub(super) agg_config: AggregatorConfig,
77 pub(super) post_pipeline: SharedPipeline,
78}
79
80pub(super) struct ManagedRoute {
81 pub(super) definition: RouteDefinitionInfo,
83 pub(super) from_uri: String,
85 pub(super) pipeline: SharedPipeline,
87 pub(super) concurrency: Option<ConcurrencyModel>,
89 pub(super) consumer_handle: Option<JoinHandle<()>>,
91 pub(super) pipeline_handle: Option<JoinHandle<()>>,
93 pub(super) consumer_cancel_token: CancellationToken,
96 pub(super) pipeline_cancel_token: CancellationToken,
99 pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
102 pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
104 pub(super) aggregate_split: Option<AggregateSplitInfo>,
105 pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
106}
107
108pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
109 handle.as_ref().is_some_and(|h| !h.is_finished())
110}
111
112fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
113 match (
114 handle_is_running(&managed.consumer_handle),
115 handle_is_running(&managed.pipeline_handle),
116 ) {
117 (true, true) => "Started",
118 (false, true) => "Suspended",
119 (true, false) => "Stopping",
120 (false, false) => "Stopped",
121 }
122}
123
124fn find_top_level_aggregate_with_timeout(
125 steps: &[BuilderStep],
126) -> Option<(usize, AggregatorConfig)> {
127 for (i, step) in steps.iter().enumerate() {
128 if let BuilderStep::Aggregate { config } = step {
129 if has_timeout_condition(&config.completion) {
130 return Some((i, config.clone()));
131 }
132 break;
133 }
134 }
135 None
136}
137
138pub(crate) struct ControllerComponentContext {
139 registry: Arc<std::sync::Mutex<Registry>>,
140 languages: SharedLanguageRegistry,
141 metrics: Arc<dyn MetricsCollector>,
142}
143
144impl ControllerComponentContext {
145 pub(crate) fn new(
146 registry: Arc<std::sync::Mutex<Registry>>,
147 languages: SharedLanguageRegistry,
148 metrics: Arc<dyn MetricsCollector>,
149 ) -> Self {
150 Self {
151 registry,
152 languages,
153 metrics,
154 }
155 }
156}
157
158impl ComponentContext for ControllerComponentContext {
159 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
160 self.registry.lock().ok()?.get(scheme)
161 }
162
163 fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
164 self.languages.lock().ok()?.get(name).cloned()
165 }
166
167 fn metrics(&self) -> Arc<dyn MetricsCollector> {
168 Arc::clone(&self.metrics)
169 }
170}
171
172fn is_pending(ex: &Exchange) -> bool {
173 ex.property("CamelAggregatorPending")
174 .and_then(|v| v.as_bool())
175 .unwrap_or(false)
176}
177
178async fn ready_with_backoff(
185 pipeline: &mut BoxProcessor,
186 cancel: &CancellationToken,
187) -> Result<(), CamelError> {
188 loop {
189 match pipeline.ready().await {
190 Ok(_) => return Ok(()),
191 Err(CamelError::CircuitOpen(ref msg)) => {
192 warn!("Circuit open, backing off: {msg}");
193 tokio::select! {
194 _ = tokio::time::sleep(Duration::from_secs(1)) => {
195 continue;
196 }
197 _ = cancel.cancelled() => {
198 return Err(CamelError::CircuitOpen(msg.clone()));
200 }
201 }
202 }
203 Err(e) => {
204 error!("Pipeline not ready: {e}");
205 return Err(e);
206 }
207 }
208 }
209}
210
211fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
212 let stamp = std::time::SystemTime::now()
213 .duration_since(std::time::UNIX_EPOCH)
214 .unwrap_or_default()
215 .as_nanos();
216 RuntimeCommand::FailRoute {
217 route_id: route_id.to_string(),
218 error: error.to_string(),
219 command_id: format!("ctrl-fail-{route_id}-{stamp}"),
220 causation_id: None,
221 }
222}
223
224pub(super) async fn publish_runtime_failure(
225 runtime: Option<Weak<dyn RuntimeHandle>>,
226 route_id: &str,
227 error: &str,
228) {
229 let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
230 return;
231 };
232 let command = runtime_failure_command(route_id, error);
233 if let Err(runtime_error) = runtime.execute(command).await {
234 warn!(
235 route_id = %route_id,
236 error = %runtime_error,
237 "failed to synchronize route crash with runtime projection"
238 );
239 }
240}
241
242pub struct DefaultRouteController {
250 routes: HashMap<String, ManagedRoute>,
252 registry: Arc<std::sync::Mutex<Registry>>,
254 languages: SharedLanguageRegistry,
256 beans: Arc<std::sync::Mutex<BeanRegistry>>,
258 runtime: Option<Weak<dyn RuntimeHandle>>,
260 global_error_handler: Option<ErrorHandlerConfig>,
262 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
264 tracing_enabled: bool,
266 tracer_detail_level: DetailLevel,
268 tracer_metrics: Option<Arc<dyn MetricsCollector>>,
270}
271
272impl DefaultRouteController {
273 pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
275 Self::with_beans(
276 registry,
277 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
278 )
279 }
280
281 pub fn with_beans(
283 registry: Arc<std::sync::Mutex<Registry>>,
284 beans: Arc<std::sync::Mutex<BeanRegistry>>,
285 ) -> Self {
286 Self {
287 routes: HashMap::new(),
288 registry,
289 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
290 beans,
291 runtime: None,
292 global_error_handler: None,
293 crash_notifier: None,
294 tracing_enabled: false,
295 tracer_detail_level: DetailLevel::Minimal,
296 tracer_metrics: None,
297 }
298 }
299
300 pub fn with_languages(
302 registry: Arc<std::sync::Mutex<Registry>>,
303 languages: SharedLanguageRegistry,
304 ) -> Self {
305 Self {
306 routes: HashMap::new(),
307 registry,
308 languages,
309 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
310 runtime: None,
311 global_error_handler: None,
312 crash_notifier: None,
313 tracing_enabled: false,
314 tracer_detail_level: DetailLevel::Minimal,
315 tracer_metrics: None,
316 }
317 }
318
319 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
321 self.runtime = Some(Arc::downgrade(&runtime));
322 }
323
324 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
329 self.crash_notifier = Some(tx);
330 }
331
332 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
334 self.global_error_handler = Some(config);
335 }
336
337 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
339 self.tracing_enabled = config.enabled;
340 self.tracer_detail_level = config.detail_level.clone();
341 self.tracer_metrics = config.metrics_collector.clone();
342 }
343
344 fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
345 let mut producer_ctx = ProducerContext::new();
346 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
347 producer_ctx = producer_ctx.with_runtime(runtime);
348 }
349 Ok(producer_ctx)
350 }
351
352 fn resolve_error_handler(
354 &self,
355 config: ErrorHandlerConfig,
356 producer_ctx: &ProducerContext,
357 component_ctx: &dyn ComponentContext,
358 ) -> Result<ErrorHandlerLayer, CamelError> {
359 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
361 let parsed = parse_uri(uri)?;
362 let component = component_ctx
363 .resolve_component(&parsed.scheme)
364 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
365 let endpoint = component.create_endpoint(uri, component_ctx)?;
366 Some(endpoint.create_producer(producer_ctx)?)
367 } else {
368 None
369 };
370
371 let mut resolved_policies = Vec::new();
373 for policy in config.policies {
374 let handler_producer = if let Some(ref uri) = policy.handled_by {
375 let parsed = parse_uri(uri)?;
376 let component = component_ctx
377 .resolve_component(&parsed.scheme)
378 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
379 let endpoint = component.create_endpoint(uri, component_ctx)?;
380 Some(endpoint.create_producer(producer_ctx)?)
381 } else {
382 None
383 };
384 resolved_policies.push((policy, handler_producer));
385 }
386
387 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
388 }
389
390 fn resolve_uow_layer(
393 &self,
394 config: &UnitOfWorkConfig,
395 producer_ctx: &ProducerContext,
396 component_ctx: &dyn ComponentContext,
397 counter: Option<Arc<AtomicU64>>,
398 ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
399 let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
400 let parsed = parse_uri(uri)?;
401 let component = component_ctx
402 .resolve_component(&parsed.scheme)
403 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
404 let endpoint = component.create_endpoint(uri, component_ctx)?;
405 endpoint.create_producer(producer_ctx).map_err(|e| {
406 CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
407 })
408 };
409
410 let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
411 let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
412
413 let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
414 let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
415 Ok((layer, counter))
416 }
417
418 pub(crate) fn resolve_steps(
420 &self,
421 steps: Vec<BuilderStep>,
422 producer_ctx: &ProducerContext,
423 registry: &Arc<std::sync::Mutex<Registry>>,
424 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
425 let component_ctx = Arc::new(ControllerComponentContext::new(
426 Arc::clone(registry),
427 Arc::clone(&self.languages),
428 self.tracer_metrics
429 .clone()
430 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
431 ));
432
433 super::step_resolution::resolve_steps(
434 steps,
435 producer_ctx,
436 registry,
437 &self.languages,
438 &self.beans,
439 component_ctx,
440 )
441 }
442
443 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
453 let route_id = definition.route_id().to_string();
454
455 if self.routes.contains_key(&route_id) {
456 return Err(CamelError::RouteError(format!(
457 "Route '{}' already exists",
458 route_id
459 )));
460 }
461
462 info!(route_id = %route_id, "Adding route to controller");
463
464 let definition_info = definition.to_info();
466 let RouteDefinition {
467 from_uri,
468 steps,
469 error_handler,
470 circuit_breaker,
471 unit_of_work,
472 concurrency,
473 ..
474 } = definition;
475
476 let producer_ctx = self.build_producer_context()?;
478
479 let mut aggregate_split: Option<AggregateSplitInfo> = None;
481 let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
482 Some((idx, agg_config)) => {
483 let mut pre_steps = steps;
484 let mut rest = pre_steps.split_off(idx);
485 let _agg_step = rest.remove(0);
486 let post_steps = rest;
487
488 let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
489 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
490 let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
491 compose_pipeline(pre_procs),
492 )));
493
494 let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
495 let post_procs: Vec<BoxProcessor> =
496 post_pairs.into_iter().map(|(p, _)| p).collect();
497 let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
498 compose_pipeline(post_procs),
499 )));
500
501 aggregate_split = Some(AggregateSplitInfo {
502 pre_pipeline,
503 agg_config,
504 post_pipeline,
505 });
506
507 vec![]
508 }
509 None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
510 };
511 let route_id_for_tracing = route_id.clone();
512 let mut pipeline = if processors_with_contracts.is_empty() {
513 BoxProcessor::new(IdentityProcessor)
514 } else {
515 compose_traced_pipeline_with_contracts(
516 processors_with_contracts,
517 &route_id_for_tracing,
518 self.tracing_enabled,
519 self.tracer_detail_level.clone(),
520 self.tracer_metrics.clone(),
521 )
522 };
523
524 if let Some(cb_config) = circuit_breaker {
526 let cb_layer = CircuitBreakerLayer::new(cb_config);
527 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
528 }
529
530 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
532
533 if let Some(config) = eh_config {
534 let component_ctx = ControllerComponentContext::new(
535 Arc::clone(&self.registry),
536 Arc::clone(&self.languages),
537 self.tracer_metrics
538 .clone()
539 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
540 );
541 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
542 pipeline = BoxProcessor::new(layer.layer(pipeline));
543 }
544
545 let uow_counter = if let Some(uow_config) = &unit_of_work {
547 let component_ctx = ControllerComponentContext::new(
548 Arc::clone(&self.registry),
549 Arc::clone(&self.languages),
550 self.tracer_metrics
551 .clone()
552 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
553 );
554 let (uow_layer, counter) =
555 self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
556 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
557 Some(counter)
558 } else {
559 None
560 };
561
562 self.routes.insert(
563 route_id.clone(),
564 ManagedRoute {
565 definition: definition_info,
566 from_uri,
567 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
568 concurrency,
569 consumer_handle: None,
570 pipeline_handle: None,
571 consumer_cancel_token: CancellationToken::new(),
572 pipeline_cancel_token: CancellationToken::new(),
573 channel_sender: None,
574 in_flight: uow_counter,
575 aggregate_split,
576 agg_service: None,
577 },
578 );
579
580 Ok(())
581 }
582
583 pub fn compile_route_definition(
588 &self,
589 def: RouteDefinition,
590 ) -> Result<BoxProcessor, CamelError> {
591 let route_id = def.route_id().to_string();
592
593 let producer_ctx = self.build_producer_context()?;
594
595 let processors_with_contracts =
596 self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
597 let mut pipeline = compose_traced_pipeline_with_contracts(
598 processors_with_contracts,
599 &route_id,
600 self.tracing_enabled,
601 self.tracer_detail_level.clone(),
602 self.tracer_metrics.clone(),
603 );
604
605 if let Some(cb_config) = def.circuit_breaker {
606 let cb_layer = CircuitBreakerLayer::new(cb_config);
607 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
608 }
609
610 let eh_config = def
611 .error_handler
612 .clone()
613 .or_else(|| self.global_error_handler.clone());
614 if let Some(config) = eh_config {
615 let component_ctx = ControllerComponentContext::new(
616 Arc::clone(&self.registry),
617 Arc::clone(&self.languages),
618 self.tracer_metrics
619 .clone()
620 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
621 );
622 let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
623 pipeline = BoxProcessor::new(layer.layer(pipeline));
624 }
625
626 if let Some(uow_config) = &def.unit_of_work {
628 let existing_counter = self
629 .routes
630 .get(&route_id)
631 .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
632
633 let component_ctx = ControllerComponentContext::new(
634 Arc::clone(&self.registry),
635 Arc::clone(&self.languages),
636 self.tracer_metrics
637 .clone()
638 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
639 );
640
641 let (uow_layer, _counter) = self.resolve_uow_layer(
642 uow_config,
643 &producer_ctx,
644 &component_ctx,
645 existing_counter,
646 )?;
647
648 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
649 }
650
651 Ok(pipeline)
652 }
653
654 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
660 let managed = self.routes.get(route_id).ok_or_else(|| {
661 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
662 })?;
663 if handle_is_running(&managed.consumer_handle)
664 || handle_is_running(&managed.pipeline_handle)
665 {
666 return Err(CamelError::RouteError(format!(
667 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
668 route_id,
669 inferred_lifecycle_label(managed)
670 )));
671 }
672 self.routes.remove(route_id);
673 info!(route_id = %route_id, "Route removed from controller");
674 Ok(())
675 }
676
677 pub fn route_count(&self) -> usize {
679 self.routes.len()
680 }
681
682 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
683 self.routes.get(route_id).map(|r| {
684 r.in_flight
685 .as_ref()
686 .map_or(0, |c| c.load(Ordering::Relaxed))
687 })
688 }
689
690 pub fn route_exists(&self, route_id: &str) -> bool {
692 self.routes.contains_key(route_id)
693 }
694
695 pub fn route_ids(&self) -> Vec<String> {
697 self.routes.keys().cloned().collect()
698 }
699
700 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
701 self.routes
702 .get(route_id)
703 .and_then(|m| m.definition.source_hash())
704 }
705
706 pub fn auto_startup_route_ids(&self) -> Vec<String> {
708 let mut pairs: Vec<(String, i32)> = self
709 .routes
710 .iter()
711 .filter(|(_, managed)| managed.definition.auto_startup())
712 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
713 .collect();
714 pairs.sort_by_key(|(_, order)| *order);
715 pairs.into_iter().map(|(id, _)| id).collect()
716 }
717
718 pub fn shutdown_route_ids(&self) -> Vec<String> {
720 let mut pairs: Vec<(String, i32)> = self
721 .routes
722 .iter()
723 .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
724 .collect();
725 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
726 pairs.into_iter().map(|(id, _)| id).collect()
727 }
728
729 pub fn swap_pipeline(
734 &self,
735 route_id: &str,
736 new_pipeline: BoxProcessor,
737 ) -> Result<(), CamelError> {
738 let managed = self
739 .routes
740 .get(route_id)
741 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
742
743 if managed.aggregate_split.is_some() {
744 tracing::warn!(
745 route_id = %route_id,
746 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
747 );
748 }
749
750 managed
751 .pipeline
752 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
753 info!(route_id = %route_id, "Pipeline swapped atomically");
754 Ok(())
755 }
756
757 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
759 self.routes.get(route_id).map(|r| r.from_uri.clone())
760 }
761
762 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
767 self.routes
768 .get(route_id)
769 .map(|r| r.pipeline.load().0.clone())
770 }
771
772 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
774 super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
775 }
776
777 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
778 self.start_route(route_id).await
779 }
780
781 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
782 self.stop_route(route_id).await
783 }
784}
785
786#[async_trait::async_trait]
787impl RouteController for DefaultRouteController {
788 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
789 {
791 let managed = self
792 .routes
793 .get_mut(route_id)
794 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
795
796 let consumer_running = handle_is_running(&managed.consumer_handle);
797 let pipeline_running = handle_is_running(&managed.pipeline_handle);
798 if consumer_running && pipeline_running {
799 return Ok(());
800 }
801 if !consumer_running && pipeline_running {
802 return Err(CamelError::RouteError(format!(
803 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
804 route_id
805 )));
806 }
807 if consumer_running && !pipeline_running {
808 return Err(CamelError::RouteError(format!(
809 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
810 route_id
811 )));
812 }
813 }
814
815 info!(route_id = %route_id, "Starting route");
816
817 let (from_uri, pipeline, concurrency) = {
819 let managed = self
820 .routes
821 .get(route_id)
822 .expect("invariant: route must exist after prior existence check");
823 (
824 managed.from_uri.clone(),
825 Arc::clone(&managed.pipeline),
826 managed.concurrency.clone(),
827 )
828 };
829
830 let crash_notifier = self.crash_notifier.clone();
832 let runtime_for_consumer = self.runtime.clone();
833
834 let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
835 &self.registry,
836 &from_uri,
837 &ControllerComponentContext::new(
838 Arc::clone(&self.registry),
839 Arc::clone(&self.languages),
840 self.tracer_metrics
841 .clone()
842 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
843 ),
844 )?;
845
846 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
848
849 let managed = self
851 .routes
852 .get_mut(route_id)
853 .expect("invariant: route must exist after prior existence check");
854
855 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
857 let consumer_cancel = managed.consumer_cancel_token.child_token();
859 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
860 let tx_for_storage = tx.clone();
862 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
863
864 let managed = self
866 .routes
867 .get_mut(route_id)
868 .expect("invariant: route must exist after prior existence check");
869
870 if let Some(split) = managed.aggregate_split.as_ref() {
871 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
872
873 let route_cancel_clone = pipeline_cancel.clone();
874 let svc = AggregatorService::new(
875 split.agg_config.clone(),
876 late_tx,
877 Arc::clone(&self.languages),
878 route_cancel_clone,
879 );
880 let agg = Arc::new(std::sync::Mutex::new(svc));
881
882 managed.agg_service = Some(Arc::clone(&agg));
883
884 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
885 let pre_pipeline = Arc::clone(&split.pre_pipeline);
886 let post_pipeline = Arc::clone(&split.post_pipeline);
887
888 let consumer_handle = super::consumer_management::spawn_consumer_task(
890 route_id.to_string(),
891 consumer,
892 consumer_ctx,
893 crash_notifier,
894 runtime_for_consumer,
895 false,
896 );
897
898 let pipeline_handle = tokio::spawn(async move {
900 loop {
901 tokio::select! {
902 biased;
903
904 late_ex = async {
905 let mut rx = late_rx.lock().await;
906 rx.recv().await
907 } => {
908 match late_ex {
909 Some(ex) => {
910 let pipe = post_pipeline.load();
911 if let Err(e) = pipe.0.clone().oneshot(ex).await {
912 tracing::warn!(error = %e, "late exchange post-pipeline failed");
913 }
914 }
915 None => return,
916 }
917 }
918
919 envelope_opt = rx.recv() => {
920 match envelope_opt {
921 Some(envelope) => {
922 let ExchangeEnvelope { exchange, reply_tx } = envelope;
923 let pre_pipe = pre_pipeline.load();
924 let ex = match pre_pipe.0.clone().oneshot(exchange).await {
925 Ok(ex) => ex,
926 Err(e) => {
927 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
928 continue;
929 }
930 };
931
932 let ex = {
933 let cloned_svc = agg
934 .lock()
935 .expect("mutex poisoned: another thread panicked while holding this lock")
936 .clone();
937 cloned_svc.oneshot(ex).await
938 };
939
940 match ex {
941 Ok(ex) => {
942 if !is_pending(&ex) {
943 let post_pipe = post_pipeline.load();
944 let out = post_pipe.0.clone().oneshot(ex).await;
945 if let Some(tx) = reply_tx { let _ = tx.send(out); }
946 } else if let Some(tx) = reply_tx {
947 let _ = tx.send(Ok(ex));
948 }
949 }
950 Err(e) => {
951 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
952 }
953 }
954 }
955 None => return,
956 }
957 }
958
959 _ = pipeline_cancel.cancelled() => {
960 {
961 let guard = agg
962 .lock()
963 .expect("mutex poisoned: another thread panicked while holding this lock");
964 guard.force_complete_all();
965 }
966 let mut rx_guard = late_rx.lock().await;
967 while let Ok(late_ex) = rx_guard.try_recv() {
968 let pipe = post_pipeline.load();
969 let _ = pipe.0.clone().oneshot(late_ex).await;
970 }
971 break;
972 }
973 }
974 }
975 });
976
977 let managed = self
978 .routes
979 .get_mut(route_id)
980 .expect("invariant: route must exist");
981 managed.consumer_handle = Some(consumer_handle);
982 managed.pipeline_handle = Some(pipeline_handle);
983 managed.channel_sender = Some(tx_for_storage);
984
985 info!(route_id = %route_id, "Route started (aggregate with timeout)");
986 return Ok(());
987 }
988 let consumer_handle = super::consumer_management::spawn_consumer_task(
992 route_id.to_string(),
993 consumer,
994 consumer_ctx,
995 crash_notifier,
996 runtime_for_consumer,
997 false,
998 );
999
1000 let pipeline_handle = match effective_concurrency {
1002 ConcurrencyModel::Sequential => {
1003 tokio::spawn(async move {
1004 loop {
1005 let envelope = tokio::select! {
1007 envelope = rx.recv() => match envelope {
1008 Some(e) => e,
1009 None => return, },
1011 _ = pipeline_cancel.cancelled() => {
1012 return;
1014 }
1015 };
1016 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1017
1018 let mut pipeline = pipeline.load().0.clone();
1020
1021 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1022 if let Some(tx) = reply_tx {
1023 let _ = tx.send(Err(e));
1024 }
1025 return;
1026 }
1027
1028 let result = pipeline.call(exchange).await;
1029 if let Some(tx) = reply_tx {
1030 let _ = tx.send(result);
1031 } else if let Err(ref e) = result
1032 && !matches!(e, CamelError::Stopped)
1033 {
1034 error!("Pipeline error: {e}");
1035 }
1036 }
1037 })
1038 }
1039 ConcurrencyModel::Concurrent { max } => {
1040 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1041 tokio::spawn(async move {
1042 loop {
1043 let envelope = tokio::select! {
1045 envelope = rx.recv() => match envelope {
1046 Some(e) => e,
1047 None => return, },
1049 _ = pipeline_cancel.cancelled() => {
1050 return;
1052 }
1053 };
1054 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1055 let pipe_ref = Arc::clone(&pipeline);
1056 let sem = sem.clone();
1057 let cancel = pipeline_cancel.clone();
1058 tokio::spawn(async move {
1059 let _permit = match &sem {
1061 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1062 None => None,
1063 };
1064
1065 let mut pipe = pipe_ref.load().0.clone();
1067
1068 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1070 if let Some(tx) = reply_tx {
1071 let _ = tx.send(Err(e));
1072 }
1073 return;
1074 }
1075
1076 let result = pipe.call(exchange).await;
1077 if let Some(tx) = reply_tx {
1078 let _ = tx.send(result);
1079 } else if let Err(ref e) = result
1080 && !matches!(e, CamelError::Stopped)
1081 {
1082 error!("Pipeline error: {e}");
1083 }
1084 });
1085 }
1086 })
1087 }
1088 };
1089
1090 let managed = self
1092 .routes
1093 .get_mut(route_id)
1094 .expect("invariant: route must exist after prior existence check");
1095 managed.consumer_handle = Some(consumer_handle);
1096 managed.pipeline_handle = Some(pipeline_handle);
1097 managed.channel_sender = Some(tx_for_storage);
1098
1099 info!(route_id = %route_id, "Route started");
1100 Ok(())
1101 }
1102
1103 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1104 self.stop_route_internal(route_id).await
1105 }
1106
1107 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1108 self.stop_route(route_id).await?;
1109 tokio::time::sleep(Duration::from_millis(100)).await;
1110 self.start_route(route_id).await
1111 }
1112
1113 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1114 let managed = self
1116 .routes
1117 .get_mut(route_id)
1118 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1119
1120 let consumer_running = handle_is_running(&managed.consumer_handle);
1121 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1122
1123 if !consumer_running || !pipeline_running {
1125 return Err(CamelError::RouteError(format!(
1126 "Cannot suspend route '{}' with execution lifecycle {}",
1127 route_id,
1128 inferred_lifecycle_label(managed)
1129 )));
1130 }
1131
1132 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1133
1134 let managed = self
1136 .routes
1137 .get_mut(route_id)
1138 .expect("invariant: route must exist after prior existence check");
1139 managed.consumer_cancel_token.cancel();
1140
1141 let managed = self
1143 .routes
1144 .get_mut(route_id)
1145 .expect("invariant: route must exist after prior existence check");
1146 let consumer_handle = managed.consumer_handle.take();
1147
1148 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1150 if let Some(handle) = consumer_handle {
1151 let _ = handle.await;
1152 }
1153 })
1154 .await;
1155
1156 if timeout_result.is_err() {
1157 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1158 }
1159
1160 let managed = self
1162 .routes
1163 .get_mut(route_id)
1164 .expect("invariant: route must exist after prior existence check");
1165
1166 managed.consumer_cancel_token = CancellationToken::new();
1168
1169 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1170 Ok(())
1171 }
1172
1173 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1174 let managed = self
1176 .routes
1177 .get(route_id)
1178 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1179
1180 let consumer_running = handle_is_running(&managed.consumer_handle);
1181 let pipeline_running = handle_is_running(&managed.pipeline_handle);
1182 if consumer_running || !pipeline_running {
1183 return Err(CamelError::RouteError(format!(
1184 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1185 route_id,
1186 inferred_lifecycle_label(managed)
1187 )));
1188 }
1189
1190 let sender = managed.channel_sender.clone().ok_or_else(|| {
1192 CamelError::RouteError("Suspended route has no channel sender".into())
1193 })?;
1194
1195 let from_uri = managed.from_uri.clone();
1197
1198 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1199
1200 let (consumer, _) = super::consumer_management::create_route_consumer(
1201 &self.registry,
1202 &from_uri,
1203 &ControllerComponentContext::new(
1204 Arc::clone(&self.registry),
1205 Arc::clone(&self.languages),
1206 self.tracer_metrics
1207 .clone()
1208 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1209 ),
1210 )?;
1211
1212 let managed = self
1214 .routes
1215 .get_mut(route_id)
1216 .expect("invariant: route must exist after prior existence check");
1217
1218 let consumer_cancel = managed.consumer_cancel_token.child_token();
1220
1221 let crash_notifier = self.crash_notifier.clone();
1222 let runtime_for_consumer = self.runtime.clone();
1223
1224 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1226
1227 let consumer_handle = super::consumer_management::spawn_consumer_task(
1229 route_id.to_string(),
1230 consumer,
1231 consumer_ctx,
1232 crash_notifier,
1233 runtime_for_consumer,
1234 true,
1235 );
1236
1237 let managed = self
1239 .routes
1240 .get_mut(route_id)
1241 .expect("invariant: route must exist after prior existence check");
1242 managed.consumer_handle = Some(consumer_handle);
1243
1244 info!(route_id = %route_id, "Route resumed");
1245 Ok(())
1246 }
1247
1248 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1249 let route_ids: Vec<String> = {
1252 let mut pairs: Vec<_> = self
1253 .routes
1254 .iter()
1255 .filter(|(_, r)| r.definition.auto_startup())
1256 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1257 .collect();
1258 pairs.sort_by_key(|(_, order)| *order);
1259 pairs.into_iter().map(|(id, _)| id).collect()
1260 };
1261
1262 info!("Starting {} auto-startup routes", route_ids.len());
1263
1264 let mut errors: Vec<String> = Vec::new();
1266 for route_id in route_ids {
1267 if let Err(e) = self.start_route(&route_id).await {
1268 errors.push(format!("Route '{}': {}", route_id, e));
1269 }
1270 }
1271
1272 if !errors.is_empty() {
1273 return Err(CamelError::RouteError(format!(
1274 "Failed to start routes: {}",
1275 errors.join(", ")
1276 )));
1277 }
1278
1279 info!("All auto-startup routes started");
1280 Ok(())
1281 }
1282
1283 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1284 let route_ids: Vec<String> = {
1286 let mut pairs: Vec<_> = self
1287 .routes
1288 .iter()
1289 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1290 .collect();
1291 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1292 pairs.into_iter().map(|(id, _)| id).collect()
1293 };
1294
1295 info!("Stopping {} routes", route_ids.len());
1296
1297 for route_id in route_ids {
1298 let _ = self.stop_route(&route_id).await;
1299 }
1300
1301 info!("All routes stopped");
1302 Ok(())
1303 }
1304}
1305
1306#[cfg(test)]
1307#[path = "route_controller_tests.rs"]
1308mod tests;