Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, ServiceExt};
13use tracing::{info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::metrics::MetricsCollector;
17#[allow(unused_imports)]
18use camel_api::{
19    BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
20    NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeHandle,
21};
22use camel_component_api::{Consumer, ConsumerContext, consumer::ExchangeEnvelope};
23use camel_processor::aggregator::AggregatorService;
24pub use camel_processor::aggregator::SharedLanguageRegistry;
25
26use crate::health_registry::HealthCheckRegistry;
27use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
28use crate::lifecycle::adapters::route_compiler::compose_pipeline;
29use crate::lifecycle::adapters::route_compiler_ext::{RouteCompilerExt, build_eh_config_pipeline};
30pub(crate) use crate::lifecycle::adapters::route_helpers::PreparedRoute;
31use crate::lifecycle::adapters::route_helpers::{
32    AggregateSplitInfo, CrashNotification, ManagedRoute, find_top_level_aggregate_requiring_split,
33    handle_is_running, inferred_lifecycle_label, is_pending,
34};
35#[cfg(test)]
36pub(super) use crate::lifecycle::adapters::route_helpers::{
37    emit_start_route_event, set_start_route_event_hook,
38};
39use crate::lifecycle::adapters::route_registry::RouteRegistry;
40use crate::lifecycle::adapters::route_runtime_state;
41use crate::lifecycle::adapters::step_compilers::CompiledStep;
42use crate::lifecycle::application::route_definition::{BuilderStep, RouteDefinition};
43use crate::shared::components::domain::Registry;
44use crate::shared::observability::domain::{DetailLevel, TracerConfig};
45use camel_bean::BeanRegistry;
46
47/// Default implementation of [`RouteController`].
48///
49/// Manages route lifecycle with support for:
50/// - Starting/stopping individual routes
51/// - Suspending and resuming routes
52/// - Auto-startup with startup ordering
53/// - Graceful shutdown
54pub struct DefaultRouteController {
55    /// Routes indexed by route ID.
56    pub(super) routes: RouteRegistry,
57    /// Reference to the component registry for resolving endpoints.
58    pub(super) registry: Arc<std::sync::Mutex<Registry>>,
59    /// Shared language registry for resolving declarative language expressions.
60    pub(super) languages: SharedLanguageRegistry,
61    /// Bean registry for bean method invocation.
62    pub(super) beans: Arc<std::sync::Mutex<BeanRegistry>>,
63    /// Runtime handle injected into ProducerContext for command/query operations.
64    pub(super) runtime: Option<Weak<dyn RuntimeHandle>>,
65    /// Optional global error handler applied to all routes without a per-route handler.
66    pub(super) global_error_handler: Option<ErrorHandlerConfig>,
67    /// Optional crash notifier for supervision.
68    pub(super) crash_notifier: Option<mpsc::Sender<CrashNotification>>,
69    /// Whether tracing is enabled for route pipelines.
70    pub(super) tracing_enabled: bool,
71    /// Detail level for tracing when enabled.
72    pub(super) tracer_detail_level: DetailLevel,
73    /// Metrics collector for tracing processor.
74    pub(super) tracer_metrics: Option<Arc<dyn MetricsCollector>>,
75    pub(super) platform_service: Arc<dyn PlatformService>,
76    pub(super) function_invoker: Option<Arc<dyn FunctionInvoker>>,
77    pub(super) health_registry: Option<Arc<HealthCheckRegistry>>,
78}
79
80impl DefaultRouteController {
81    pub(super) fn health_registry(&self) -> Arc<HealthCheckRegistry> {
82        self.health_registry.clone().unwrap_or_else(|| {
83            warn!("health_registry not configured — creating isolated fallback");
84            Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
85        })
86    }
87
88    /// Create a new `DefaultRouteController` with the given registry.
89    pub fn new(
90        registry: Arc<std::sync::Mutex<Registry>>,
91        platform_service: Arc<dyn PlatformService>,
92    ) -> Self {
93        Self::with_beans_and_platform_service(
94            registry,
95            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
96            platform_service,
97        )
98    }
99
100    /// Create a new `DefaultRouteController` with shared bean registry.
101    pub fn with_beans(
102        registry: Arc<std::sync::Mutex<Registry>>,
103        beans: Arc<std::sync::Mutex<BeanRegistry>>,
104    ) -> Self {
105        Self::with_beans_and_platform_service(
106            registry,
107            beans,
108            Arc::new(NoopPlatformService::default()),
109        )
110    }
111
112    fn with_beans_and_platform_service(
113        registry: Arc<std::sync::Mutex<Registry>>,
114        beans: Arc<std::sync::Mutex<BeanRegistry>>,
115        platform_service: Arc<dyn PlatformService>,
116    ) -> Self {
117        Self {
118            routes: RouteRegistry::new(),
119            registry,
120            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
121            beans,
122            runtime: None,
123            global_error_handler: None,
124            crash_notifier: None,
125            tracing_enabled: false,
126            tracer_detail_level: DetailLevel::Minimal,
127            tracer_metrics: None,
128            platform_service,
129            function_invoker: None,
130            health_registry: None,
131        }
132    }
133
134    /// Create a new `DefaultRouteController` with shared language registry.
135    pub fn with_languages(
136        registry: Arc<std::sync::Mutex<Registry>>,
137        languages: SharedLanguageRegistry,
138        platform_service: Arc<dyn PlatformService>,
139    ) -> Self {
140        Self {
141            routes: RouteRegistry::new(),
142            registry,
143            languages,
144            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
145            runtime: None,
146            global_error_handler: None,
147            crash_notifier: None,
148            tracing_enabled: false,
149            tracer_detail_level: DetailLevel::Minimal,
150            tracer_metrics: None,
151            platform_service,
152            function_invoker: None,
153            health_registry: None,
154        }
155    }
156
157    pub fn with_languages_and_beans(
158        registry: Arc<std::sync::Mutex<Registry>>,
159        languages: SharedLanguageRegistry,
160        platform_service: Arc<dyn PlatformService>,
161        beans: Arc<std::sync::Mutex<BeanRegistry>>,
162    ) -> Self {
163        Self {
164            routes: RouteRegistry::new(),
165            registry,
166            languages,
167            beans,
168            runtime: None,
169            global_error_handler: None,
170            crash_notifier: None,
171            tracing_enabled: false,
172            tracer_detail_level: DetailLevel::Minimal,
173            tracer_metrics: None,
174            platform_service,
175            function_invoker: None,
176            health_registry: None,
177        }
178    }
179
180    pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
181        self.function_invoker = Some(function_invoker);
182        self
183    }
184
185    pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
186        self.health_registry = Some(registry);
187    }
188
189    pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
190        self.function_invoker = Some(invoker);
191    }
192
193    /// Set runtime handle for ProducerContext creation.
194    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
195        self.runtime = Some(Arc::downgrade(&runtime));
196    }
197
198    /// Set the crash notifier for supervision.
199    ///
200    /// When set, the controller will send a [`CrashNotification`] whenever
201    /// a consumer crashes.
202    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
203        self.crash_notifier = Some(tx);
204    }
205
206    /// Set a global error handler applied to all routes without a per-route handler.
207    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
208        self.global_error_handler = Some(config);
209    }
210
211    /// Configure tracing for this route controller.
212    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
213        self.tracing_enabled = config.enabled;
214        self.tracer_detail_level = config.detail_level.clone();
215        self.tracer_metrics = config.metrics_collector.clone();
216    }
217
218    fn build_producer_context(&self, route_id: &str) -> Result<ProducerContext, CamelError> {
219        let mut producer_ctx = ProducerContext::new().with_route_id(route_id);
220        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
221            producer_ctx = producer_ctx.with_runtime(runtime);
222        }
223        Ok(producer_ctx)
224    }
225
226    /// Create a transient [`RouteCompilerExt`] from this controller's fields.
227    fn route_compiler_ext(&self) -> RouteCompilerExt<'_> {
228        RouteCompilerExt {
229            registry: &self.registry,
230            languages: &self.languages,
231            beans: &self.beans,
232            function_invoker: &self.function_invoker,
233            tracing_enabled: self.tracing_enabled,
234            tracer_detail_level: &self.tracer_detail_level,
235            tracer_metrics: &self.tracer_metrics,
236            platform_service: &self.platform_service,
237            runtime: &self.runtime,
238            global_error_handler: &self.global_error_handler,
239            health_registry: &self.health_registry,
240            route_registry: &self.routes,
241        }
242    }
243
244    /// Resolve BuilderSteps into BoxProcessors.
245    pub(crate) fn resolve_steps(
246        &self,
247        steps: Vec<BuilderStep>,
248        producer_ctx: &ProducerContext,
249        registry: &Arc<std::sync::Mutex<Registry>>,
250        route_id: Option<&str>,
251        staging_mode: &super::step_resolution::FunctionStagingMode,
252    ) -> Result<Vec<CompiledStep>, CamelError> {
253        let component_ctx = Arc::new(ControllerComponentContext::new(
254            Arc::clone(registry),
255            Arc::clone(&self.languages),
256            self.tracer_metrics
257                .clone()
258                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
259            Arc::clone(&self.platform_service),
260            self.health_registry(),
261            route_id.map(|s| s.to_string()),
262        ));
263        let rt: Arc<dyn camel_component_api::RuntimeObservability> =
264            Arc::clone(&component_ctx) as Arc<_>;
265
266        super::step_resolution::resolve_steps(
267            steps,
268            producer_ctx,
269            rt,
270            registry,
271            &self.languages,
272            &self.beans,
273            self.function_invoker.clone(),
274            component_ctx,
275            route_id,
276            staging_mode,
277        )
278    }
279
280    /// Add a route definition to the controller.
281    ///
282    /// Steps are resolved immediately using the registry.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if:
287    /// - A route with the same ID already exists
288    /// - Step resolution fails
289    pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
290        let route_id = definition.route_id().to_string();
291
292        if self.routes.contains_key(&route_id) {
293            return Err(CamelError::RouteError(format!(
294                "Route '{}' already exists",
295                route_id
296            )));
297        }
298
299        info!(route_id = %route_id, "Adding route to controller");
300
301        let prepared = match self.build_managed_route(
302            definition,
303            &super::step_resolution::FunctionStagingMode::DirectAdd,
304        ) {
305            Ok(prepared) => prepared,
306            Err(err) => {
307                self.discard_function_staging();
308                return Err(err);
309            }
310        };
311
312        if let Some(invoker) = &self.function_invoker
313            && let Err(err) = invoker.commit_staged().await
314        {
315            invoker.discard_staging(0);
316            return Err(CamelError::Config(err.to_string()));
317        }
318
319        self.routes
320            .insert(prepared.route_id.clone(), prepared.managed);
321
322        Ok(())
323    }
324
325    fn build_managed_route(
326        &self,
327        definition: RouteDefinition,
328        staging_mode: &super::step_resolution::FunctionStagingMode,
329    ) -> Result<PreparedRoute, CamelError> {
330        let route_id = definition.route_id().to_string();
331
332        let definition_info = definition.to_info();
333        let RouteDefinition {
334            from_uri,
335            steps,
336            error_handler,
337            circuit_breaker,
338            security_policy,
339            security_authenticator,
340            unit_of_work,
341            concurrency,
342            ..
343        } = definition;
344
345        let producer_ctx = self.build_producer_context(&route_id)?;
346
347        let mut aggregate_split: Option<AggregateSplitInfo> = None;
348        let processors_with_contracts = match find_top_level_aggregate_requiring_split(&steps) {
349            Some((idx, agg_config)) => {
350                let mut pre_steps = steps;
351                let mut rest = pre_steps.split_off(idx);
352                let _agg_step = rest.remove(0);
353                let post_steps = rest;
354
355                let pre_pairs = self.resolve_steps(
356                    pre_steps,
357                    &producer_ctx,
358                    &self.registry,
359                    Some(&route_id),
360                    staging_mode,
361                )?;
362                let pre_pipeline =
363                    super::pipeline_runtime::new_shared_pipeline(compose_pipeline(pre_pairs));
364
365                let post_pairs = self.resolve_steps(
366                    post_steps,
367                    &producer_ctx,
368                    &self.registry,
369                    Some(&route_id),
370                    staging_mode,
371                )?;
372                let post_pipeline =
373                    super::pipeline_runtime::new_shared_pipeline(compose_pipeline(post_pairs));
374
375                aggregate_split = Some(AggregateSplitInfo {
376                    pre_pipeline,
377                    agg_config,
378                    post_pipeline,
379                });
380
381                vec![]
382            }
383            None => self.resolve_steps(
384                steps,
385                &producer_ctx,
386                &self.registry,
387                Some(&route_id),
388                staging_mode,
389            )?,
390        };
391        let route_id_for_tracing = route_id.clone();
392        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
393
394        let mut pipeline = build_eh_config_pipeline(
395            eh_config.as_ref(),
396            Arc::clone(&self.registry),
397            Arc::clone(&self.languages),
398            self.tracer_metrics.clone(),
399            Arc::clone(&self.platform_service),
400            self.health_registry(),
401            &route_id_for_tracing,
402            &producer_ctx,
403            processors_with_contracts,
404            self.tracing_enabled,
405            self.tracer_detail_level.clone(),
406            security_policy.clone(),
407            circuit_breaker,
408        )?;
409
410        let uow_counter = if let Some(uow_config) = &unit_of_work {
411            let component_ctx = Arc::new(ControllerComponentContext::new(
412                Arc::clone(&self.registry),
413                Arc::clone(&self.languages),
414                self.tracer_metrics
415                    .clone()
416                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
417                Arc::clone(&self.platform_service),
418                self.health_registry(),
419                Some(route_id.clone()),
420            ));
421            let rt: Arc<dyn camel_component_api::RuntimeObservability> =
422                Arc::clone(&component_ctx) as Arc<_>;
423            let (uow_layer, counter) = super::route_compiler_ext::resolve_uow_layer(
424                uow_config,
425                &producer_ctx,
426                rt,
427                component_ctx.as_ref(),
428                None,
429            )?;
430            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
431            Some(counter)
432        } else {
433            None
434        };
435
436        Ok(PreparedRoute {
437            route_id,
438            managed: ManagedRoute {
439                definition: definition_info,
440                from_uri,
441                pipeline: super::pipeline_runtime::new_shared_pipeline(pipeline),
442                concurrency,
443                consumer_handle: None,
444                pipeline_handle: None,
445                consumer_cancel_token: CancellationToken::new(),
446                pipeline_cancel_token: CancellationToken::new(),
447                channel_sender: None,
448                in_flight: uow_counter,
449                aggregate_split,
450                agg_service: None,
451                compiled: route_runtime_state::CompiledRoute {
452                    security_policy,
453                    security_authenticator,
454                },
455            },
456        })
457    }
458
459    pub(crate) fn insert_prepared_route(
460        &mut self,
461        prepared: PreparedRoute,
462    ) -> Result<(), CamelError> {
463        if self.routes.contains_key(&prepared.route_id) {
464            return Err(CamelError::RouteError(format!(
465                "Route '{}' already exists",
466                prepared.route_id
467            )));
468        }
469        self.routes
470            .insert(prepared.route_id.clone(), prepared.managed);
471        Ok(())
472    }
473
474    pub async fn add_route_with_generation(
475        &mut self,
476        definition: RouteDefinition,
477        generation: u64,
478    ) -> Result<(), CamelError> {
479        let route_id = definition.route_id().to_string();
480
481        if self.routes.contains_key(&route_id) {
482            return Err(CamelError::RouteError(format!(
483                "Route '{}' already exists",
484                route_id
485            )));
486        }
487
488        info!(route_id = %route_id, generation, "Adding route to controller with generation");
489
490        let prepared = self.build_managed_route(
491            definition,
492            &super::step_resolution::FunctionStagingMode::HotReload { generation },
493        )?;
494
495        self.routes
496            .insert(prepared.route_id.clone(), prepared.managed);
497
498        Ok(())
499    }
500
501    pub(crate) fn prepare_route_definition_with_generation(
502        &self,
503        definition: RouteDefinition,
504        generation: u64,
505    ) -> Result<PreparedRoute, CamelError> {
506        self.build_managed_route(
507            definition,
508            &super::step_resolution::FunctionStagingMode::HotReload { generation },
509        )
510    }
511
512    pub async fn remove_route_preserving_functions(
513        &mut self,
514        route_id: &str,
515    ) -> Result<(), CamelError> {
516        let managed = self.routes.get(route_id).ok_or_else(|| {
517            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
518        })?;
519        if handle_is_running(&managed.consumer_handle)
520            || handle_is_running(&managed.pipeline_handle)
521        {
522            return Err(CamelError::RouteError(format!(
523                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
524                route_id,
525                inferred_lifecycle_label(managed)
526            )));
527        }
528        self.routes.remove(route_id);
529        if let Some(reg) = &self.health_registry {
530            reg.unregister_for_route(route_id);
531        }
532        info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
533        Ok(())
534    }
535
536    /// Compile a route definition into a processor pipeline, without adding it
537    /// to the controller. Used for validation and testing.
538    pub fn compile_route_definition(
539        &self,
540        def: RouteDefinition,
541    ) -> Result<BoxProcessor, CamelError> {
542        self.route_compiler_ext().compile_route_definition(def)
543    }
544
545    /// Compile a route definition with a specific generation (for hot-reload).
546    pub fn compile_route_definition_with_generation(
547        &self,
548        def: RouteDefinition,
549        generation: u64,
550    ) -> Result<BoxProcessor, CamelError> {
551        self.route_compiler_ext()
552            .compile_route_definition_with_generation(def, generation)
553    }
554
555    /// Remove a route from the controller map.
556    ///
557    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
558    /// Returns an error if the route is still running or does not exist.
559    /// Does not cancel any running tasks — call `stop_route` first.
560    pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
561        let managed = self.routes.get(route_id).ok_or_else(|| {
562            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
563        })?;
564        if handle_is_running(&managed.consumer_handle)
565            || handle_is_running(&managed.pipeline_handle)
566        {
567            return Err(CamelError::RouteError(format!(
568                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
569                route_id,
570                inferred_lifecycle_label(managed)
571            )));
572        }
573        if let Some(invoker) = &self.function_invoker {
574            for (id, rid) in self.collect_function_refs(route_id) {
575                if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
576                    warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
577                }
578            }
579        }
580        self.routes.remove(route_id);
581        if let Some(reg) = &self.health_registry {
582            reg.unregister_for_route(route_id);
583        }
584        info!(route_id = %route_id, "Route removed from controller");
585        Ok(())
586    }
587
588    fn collect_function_refs(
589        &self,
590        route_id: &str,
591    ) -> Vec<(camel_api::FunctionId, Option<String>)> {
592        self.function_invoker
593            .as_ref()
594            .map(|invoker| invoker.function_refs_for_route(route_id))
595            .unwrap_or_default()
596    }
597
598    fn discard_function_staging(&self) {
599        if let Some(invoker) = &self.function_invoker {
600            invoker.discard_staging(0);
601        }
602    }
603
604    /// Returns the number of routes in the controller.
605    pub fn route_count(&self) -> usize {
606        self.routes.route_count()
607    }
608
609    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
610        self.routes.in_flight_count(route_id)
611    }
612
613    /// Returns `true` if a route with the given ID exists.
614    pub fn route_exists(&self, route_id: &str) -> bool {
615        self.routes.route_exists(route_id)
616    }
617
618    /// Returns all route IDs.
619    pub fn route_ids(&self) -> Vec<String> {
620        self.routes.route_ids()
621    }
622
623    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
624        self.routes.route_source_hash(route_id)
625    }
626
627    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
628    pub fn auto_startup_route_ids(&self) -> Vec<String> {
629        self.routes.auto_startup_route_ids()
630    }
631
632    /// Returns route IDs sorted by shutdown order (startup order descending).
633    pub fn shutdown_route_ids(&self) -> Vec<String> {
634        self.routes.shutdown_route_ids()
635    }
636
637    /// Atomically swap the pipeline of a route.
638    ///
639    /// In-flight requests finish with the old pipeline (kept alive by Arc).
640    /// New requests immediately use the new pipeline.
641    pub fn swap_pipeline(
642        &self,
643        route_id: &str,
644        new_pipeline: BoxProcessor,
645    ) -> Result<(), CamelError> {
646        let managed = self
647            .routes
648            .get(route_id)
649            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
650
651        if managed.aggregate_split.is_some() {
652            tracing::warn!(
653                route_id = %route_id,
654                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
655            );
656        }
657
658        super::pipeline_runtime::swap_pipeline(&managed.pipeline, new_pipeline);
659        info!(route_id = %route_id, "Pipeline swapped atomically");
660        Ok(())
661    }
662
663    /// Returns the from_uri of a route, if it exists.
664    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
665        self.routes.route_from_uri(route_id)
666    }
667
668    /// Get a clone of the current pipeline for a route.
669    ///
670    /// This is useful for testing and introspection.
671    /// Returns `None` if the route doesn't exist.
672    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
673        self.routes.get_pipeline(route_id)
674    }
675
676    /// Internal stop implementation that can set custom status.
677    pub(super) async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
678        self.routes.stop_route(route_id).await
679    }
680
681    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
682        self.start_route(route_id).await
683    }
684
685    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
686        self.stop_route(route_id).await
687    }
688}
689
690// ── Aggregator route helpers ──
691
692impl DefaultRouteController {
693    /// Start a route with an aggregate split (pre-pipeline → aggregator → post-pipeline).
694    ///
695    /// Spawns a biased-select forward loop that routes exchanges through the
696    /// pre-pipeline, aggregator, and post-pipeline in sequence, with late-exchange
697    /// handling and force-completion on stop.
698    #[allow(clippy::too_many_arguments)]
699    pub(super) async fn start_aggregate_route(
700        &mut self,
701        route_id: &str,
702        split: AggregateSplitInfo,
703        consumer: Box<dyn Consumer>,
704        consumer_ctx: ConsumerContext,
705        mut rx: mpsc::Receiver<ExchangeEnvelope>,
706        crash_notifier: Option<mpsc::Sender<CrashNotification>>,
707        runtime_for_consumer: Option<Weak<dyn RuntimeHandle>>,
708        tx_for_storage: mpsc::Sender<ExchangeEnvelope>,
709        // Pipeline cancellation — a child of the managed route's pipeline_cancel_token.
710        pipeline_cancel: CancellationToken,
711    ) -> Result<(), CamelError> {
712        let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
713
714        let route_cancel_clone = pipeline_cancel.clone();
715        let svc = AggregatorService::new(
716            split.agg_config.clone(),
717            late_tx,
718            Arc::clone(&self.languages),
719            route_cancel_clone,
720        );
721        let agg = Arc::new(std::sync::Mutex::new(svc));
722
723        let pipeline_cancel_for_monitor = pipeline_cancel.clone();
724        let agg_for_monitor = Arc::clone(&agg);
725
726        {
727            let managed = self
728                .routes
729                .get_mut(route_id)
730                .expect("invariant: route must exist"); // allow-unwrap
731            managed.agg_service = Some(Arc::clone(&agg));
732        }
733
734        let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
735        let pre_pipeline = split.pre_pipeline;
736        let post_pipeline = split.post_pipeline;
737
738        // Spawn biased select forward loop
739        let pipeline_handle = tokio::spawn(async move {
740            loop {
741                tokio::select! {
742                    biased;
743
744                    late_ex = async {
745                        let mut rx = late_rx.lock().await;
746                        rx.recv().await
747                    } => {
748                        match late_ex {
749                            Some(ex) => {
750                                let pipe = post_pipeline.load();
751                                if let Err(e) = pipe.clone_inner().oneshot(ex).await {
752                                    tracing::warn!(error = %e, "late exchange post-pipeline failed");
753                                }
754                            }
755                            None => return,
756                        }
757                    }
758
759                    envelope_opt = rx.recv() => {
760                        match envelope_opt {
761                            Some(envelope) => {
762                                let ExchangeEnvelope { exchange, reply_tx } = envelope;
763                                let pre_pipe = pre_pipeline.load();
764                                let ex = match pre_pipe.clone_inner().oneshot(exchange).await {
765                                    Ok(ex) => ex,
766                                    Err(e) => {
767                                        if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
768                                        continue;
769                                    }
770                                };
771
772                                let ex = {
773                                    let cloned_svc = agg
774                                        .lock()
775                                        .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
776                                        .clone();
777                                    cloned_svc.oneshot(ex).await
778                                };
779
780                                match ex {
781                                    Ok(ex) => {
782                                        if !is_pending(&ex) {
783                                            let post_pipe = post_pipeline.load();
784                                            let out = post_pipe.clone_inner().oneshot(ex).await;
785                                            if let Some(tx) = reply_tx { let _ = tx.send(out); }
786                                        } else if let Some(tx) = reply_tx {
787                                            let _ = tx.send(Ok(ex));
788                                        }
789                                    }
790                                    Err(e) => {
791                                        if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
792                                    }
793                                }
794                            }
795                            None => return,
796                        }
797                    }
798
799                    _ = pipeline_cancel.cancelled() => {
800                        {
801                            let guard = agg
802                                .lock()
803                                .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
804                            guard.force_complete_all();
805                        }
806                        let mut rx_guard = late_rx.lock().await;
807                        while let Ok(late_ex) = rx_guard.try_recv() {
808                            let pipe = post_pipeline.load();
809                            let _ = pipe.clone_inner().oneshot(late_ex).await;
810                        }
811                        break;
812                    }
813                }
814            }
815        });
816        #[cfg(test)]
817        emit_start_route_event("pipeline_spawned");
818
819        // Start consumer after pipeline loop is spawned to avoid startup races
820        // where consumers emit exchanges before the route pipeline begins polling.
821        let consumer_handle = super::consumer_management::spawn_consumer_task(
822            route_id.to_string(),
823            consumer,
824            consumer_ctx,
825            crash_notifier,
826            runtime_for_consumer,
827            false,
828        );
829
830        // Extend the stored consumer handle through aggregate force-completion.
831        // While this monitor drains pending buckets, handle_is_running still reports
832        // the Route as running because forced exchanges may still be in post-pipeline.
833        let force_on_stop = agg_for_monitor
834            .lock()
835            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
836            .config()
837            .force_completion_on_stop;
838        let consumer_handle = tokio::spawn(async move {
839            let _ = consumer_handle.await;
840            if !pipeline_cancel_for_monitor.is_cancelled() {
841                let guard = agg_for_monitor
842                    .lock()
843                    .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
844                guard.force_complete_all();
845                drop(guard);
846                if force_on_stop {
847                    pipeline_cancel_for_monitor.cancel();
848                }
849            }
850        });
851        #[cfg(test)]
852        emit_start_route_event("consumer_spawned");
853
854        {
855            let managed = self
856                .routes
857                .get_mut(route_id)
858                .expect("invariant: route must exist"); // allow-unwrap
859            managed.consumer_handle = Some(consumer_handle);
860            managed.pipeline_handle = Some(pipeline_handle);
861            managed.channel_sender = Some(tx_for_storage);
862        }
863
864        info!(route_id = %route_id, "Route started (aggregate with timeout)");
865        Ok(())
866    }
867}
868
869#[cfg(test)]
870#[path = "route_controller_tests.rs"]
871mod tests;