Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, ServiceExt};
13use tracing::{info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::metrics::MetricsCollector;
17#[allow(unused_imports)]
18use camel_api::{
19    BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
20    NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeHandle,
21};
22use camel_component_api::{Consumer, ConsumerContext, consumer::ExchangeEnvelope};
23use camel_processor::aggregator::AggregatorService;
24pub use camel_processor::aggregator::SharedLanguageRegistry;
25
26use crate::health_registry::HealthCheckRegistry;
27use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
28use crate::lifecycle::adapters::route_compiler::compose_pipeline;
29use crate::lifecycle::adapters::route_compiler_ext::{RouteCompilerExt, build_eh_config_pipeline};
30pub(crate) use crate::lifecycle::adapters::route_helpers::PreparedRoute;
31use crate::lifecycle::adapters::route_helpers::{
32    AggregateSplitInfo, CrashNotification, ManagedRoute, find_top_level_aggregate_requiring_split,
33    handle_is_running, inferred_lifecycle_label, is_pending,
34};
35#[cfg(test)]
36pub(super) use crate::lifecycle::adapters::route_helpers::{
37    emit_start_route_event, set_start_route_event_hook,
38};
39use crate::lifecycle::adapters::route_registry::RouteRegistry;
40use crate::lifecycle::adapters::route_runtime_state;
41use crate::lifecycle::application::route_definition::{BuilderStep, RouteDefinition};
42use crate::shared::components::domain::Registry;
43use crate::shared::observability::domain::{DetailLevel, TracerConfig};
44use camel_bean::BeanRegistry;
45
46/// Default implementation of [`RouteController`].
47///
48/// Manages route lifecycle with support for:
49/// - Starting/stopping individual routes
50/// - Suspending and resuming routes
51/// - Auto-startup with startup ordering
52/// - Graceful shutdown
53pub struct DefaultRouteController {
54    /// Routes indexed by route ID.
55    pub(super) routes: RouteRegistry,
56    /// Reference to the component registry for resolving endpoints.
57    pub(super) registry: Arc<std::sync::Mutex<Registry>>,
58    /// Shared language registry for resolving declarative language expressions.
59    pub(super) languages: SharedLanguageRegistry,
60    /// Bean registry for bean method invocation.
61    pub(super) beans: Arc<std::sync::Mutex<BeanRegistry>>,
62    /// Runtime handle injected into ProducerContext for command/query operations.
63    pub(super) runtime: Option<Weak<dyn RuntimeHandle>>,
64    /// Optional global error handler applied to all routes without a per-route handler.
65    pub(super) global_error_handler: Option<ErrorHandlerConfig>,
66    /// Optional crash notifier for supervision.
67    pub(super) crash_notifier: Option<mpsc::Sender<CrashNotification>>,
68    /// Whether tracing is enabled for route pipelines.
69    pub(super) tracing_enabled: bool,
70    /// Detail level for tracing when enabled.
71    pub(super) tracer_detail_level: DetailLevel,
72    /// Metrics collector for tracing processor.
73    pub(super) tracer_metrics: Option<Arc<dyn MetricsCollector>>,
74    pub(super) platform_service: Arc<dyn PlatformService>,
75    pub(super) function_invoker: Option<Arc<dyn FunctionInvoker>>,
76    pub(super) health_registry: Option<Arc<HealthCheckRegistry>>,
77}
78
79impl DefaultRouteController {
80    pub(super) fn health_registry(&self) -> Arc<HealthCheckRegistry> {
81        self.health_registry.clone().unwrap_or_else(|| {
82            warn!("health_registry not configured — creating isolated fallback");
83            Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
84        })
85    }
86
87    /// Create a new `DefaultRouteController` with the given registry.
88    pub fn new(
89        registry: Arc<std::sync::Mutex<Registry>>,
90        platform_service: Arc<dyn PlatformService>,
91    ) -> Self {
92        Self::with_beans_and_platform_service(
93            registry,
94            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
95            platform_service,
96        )
97    }
98
99    /// Create a new `DefaultRouteController` with shared bean registry.
100    pub fn with_beans(
101        registry: Arc<std::sync::Mutex<Registry>>,
102        beans: Arc<std::sync::Mutex<BeanRegistry>>,
103    ) -> Self {
104        Self::with_beans_and_platform_service(
105            registry,
106            beans,
107            Arc::new(NoopPlatformService::default()),
108        )
109    }
110
111    fn with_beans_and_platform_service(
112        registry: Arc<std::sync::Mutex<Registry>>,
113        beans: Arc<std::sync::Mutex<BeanRegistry>>,
114        platform_service: Arc<dyn PlatformService>,
115    ) -> Self {
116        Self {
117            routes: RouteRegistry::new(),
118            registry,
119            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
120            beans,
121            runtime: None,
122            global_error_handler: None,
123            crash_notifier: None,
124            tracing_enabled: false,
125            tracer_detail_level: DetailLevel::Minimal,
126            tracer_metrics: None,
127            platform_service,
128            function_invoker: None,
129            health_registry: None,
130        }
131    }
132
133    /// Create a new `DefaultRouteController` with shared language registry.
134    pub fn with_languages(
135        registry: Arc<std::sync::Mutex<Registry>>,
136        languages: SharedLanguageRegistry,
137        platform_service: Arc<dyn PlatformService>,
138    ) -> Self {
139        Self {
140            routes: RouteRegistry::new(),
141            registry,
142            languages,
143            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
144            runtime: None,
145            global_error_handler: None,
146            crash_notifier: None,
147            tracing_enabled: false,
148            tracer_detail_level: DetailLevel::Minimal,
149            tracer_metrics: None,
150            platform_service,
151            function_invoker: None,
152            health_registry: None,
153        }
154    }
155
156    pub fn with_languages_and_beans(
157        registry: Arc<std::sync::Mutex<Registry>>,
158        languages: SharedLanguageRegistry,
159        platform_service: Arc<dyn PlatformService>,
160        beans: Arc<std::sync::Mutex<BeanRegistry>>,
161    ) -> Self {
162        Self {
163            routes: RouteRegistry::new(),
164            registry,
165            languages,
166            beans,
167            runtime: None,
168            global_error_handler: None,
169            crash_notifier: None,
170            tracing_enabled: false,
171            tracer_detail_level: DetailLevel::Minimal,
172            tracer_metrics: None,
173            platform_service,
174            function_invoker: None,
175            health_registry: None,
176        }
177    }
178
179    pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
180        self.function_invoker = Some(function_invoker);
181        self
182    }
183
184    pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
185        self.health_registry = Some(registry);
186    }
187
188    pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
189        self.function_invoker = Some(invoker);
190    }
191
192    /// Set runtime handle for ProducerContext creation.
193    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
194        self.runtime = Some(Arc::downgrade(&runtime));
195    }
196
197    /// Set the crash notifier for supervision.
198    ///
199    /// When set, the controller will send a [`CrashNotification`] whenever
200    /// a consumer crashes.
201    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
202        self.crash_notifier = Some(tx);
203    }
204
205    /// Set a global error handler applied to all routes without a per-route handler.
206    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
207        self.global_error_handler = Some(config);
208    }
209
210    /// Configure tracing for this route controller.
211    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
212        self.tracing_enabled = config.enabled;
213        self.tracer_detail_level = config.detail_level.clone();
214        self.tracer_metrics = config.metrics_collector.clone();
215    }
216
217    fn build_producer_context(&self, route_id: &str) -> Result<ProducerContext, CamelError> {
218        let mut producer_ctx = ProducerContext::new().with_route_id(route_id);
219        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
220            producer_ctx = producer_ctx.with_runtime(runtime);
221        }
222        Ok(producer_ctx)
223    }
224
225    /// Create a transient [`RouteCompilerExt`] from this controller's fields.
226    fn route_compiler_ext(&self) -> RouteCompilerExt<'_> {
227        RouteCompilerExt {
228            registry: &self.registry,
229            languages: &self.languages,
230            beans: &self.beans,
231            function_invoker: &self.function_invoker,
232            tracing_enabled: self.tracing_enabled,
233            tracer_detail_level: &self.tracer_detail_level,
234            tracer_metrics: &self.tracer_metrics,
235            platform_service: &self.platform_service,
236            runtime: &self.runtime,
237            global_error_handler: &self.global_error_handler,
238            health_registry: &self.health_registry,
239            route_registry: &self.routes,
240        }
241    }
242
243    /// Resolve BuilderSteps into BoxProcessors.
244    pub(crate) fn resolve_steps(
245        &self,
246        steps: Vec<BuilderStep>,
247        producer_ctx: &ProducerContext,
248        registry: &Arc<std::sync::Mutex<Registry>>,
249        route_id: Option<&str>,
250        staging_mode: &super::step_resolution::FunctionStagingMode,
251    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
252        let component_ctx = Arc::new(ControllerComponentContext::new(
253            Arc::clone(registry),
254            Arc::clone(&self.languages),
255            self.tracer_metrics
256                .clone()
257                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
258            Arc::clone(&self.platform_service),
259            self.health_registry(),
260            route_id.map(|s| s.to_string()),
261        ));
262        let rt: Arc<dyn camel_component_api::RuntimeObservability> =
263            Arc::clone(&component_ctx) as Arc<_>;
264
265        super::step_resolution::resolve_steps(
266            steps,
267            producer_ctx,
268            rt,
269            registry,
270            &self.languages,
271            &self.beans,
272            self.function_invoker.clone(),
273            component_ctx,
274            route_id,
275            staging_mode,
276        )
277    }
278
279    /// Add a route definition to the controller.
280    ///
281    /// Steps are resolved immediately using the registry.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if:
286    /// - A route with the same ID already exists
287    /// - Step resolution fails
288    pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
289        let route_id = definition.route_id().to_string();
290
291        if self.routes.contains_key(&route_id) {
292            return Err(CamelError::RouteError(format!(
293                "Route '{}' already exists",
294                route_id
295            )));
296        }
297
298        info!(route_id = %route_id, "Adding route to controller");
299
300        let prepared = match self.build_managed_route(
301            definition,
302            &super::step_resolution::FunctionStagingMode::DirectAdd,
303        ) {
304            Ok(prepared) => prepared,
305            Err(err) => {
306                self.discard_function_staging();
307                return Err(err);
308            }
309        };
310
311        if let Some(invoker) = &self.function_invoker
312            && let Err(err) = invoker.commit_staged().await
313        {
314            invoker.discard_staging(0);
315            return Err(CamelError::Config(err.to_string()));
316        }
317
318        self.routes
319            .insert(prepared.route_id.clone(), prepared.managed);
320
321        Ok(())
322    }
323
324    fn build_managed_route(
325        &self,
326        definition: RouteDefinition,
327        staging_mode: &super::step_resolution::FunctionStagingMode,
328    ) -> Result<PreparedRoute, CamelError> {
329        let route_id = definition.route_id().to_string();
330
331        let definition_info = definition.to_info();
332        let RouteDefinition {
333            from_uri,
334            steps,
335            error_handler,
336            circuit_breaker,
337            security_policy,
338            security_authenticator,
339            unit_of_work,
340            concurrency,
341            ..
342        } = definition;
343
344        let producer_ctx = self.build_producer_context(&route_id)?;
345
346        let mut aggregate_split: Option<AggregateSplitInfo> = None;
347        let processors_with_contracts = match find_top_level_aggregate_requiring_split(&steps) {
348            Some((idx, agg_config)) => {
349                let mut pre_steps = steps;
350                let mut rest = pre_steps.split_off(idx);
351                let _agg_step = rest.remove(0);
352                let post_steps = rest;
353
354                let pre_pairs = self.resolve_steps(
355                    pre_steps,
356                    &producer_ctx,
357                    &self.registry,
358                    Some(&route_id),
359                    staging_mode,
360                )?;
361                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
362                let pre_pipeline =
363                    super::pipeline_runtime::new_shared_pipeline(compose_pipeline(pre_procs));
364
365                let post_pairs = self.resolve_steps(
366                    post_steps,
367                    &producer_ctx,
368                    &self.registry,
369                    Some(&route_id),
370                    staging_mode,
371                )?;
372                let post_procs: Vec<BoxProcessor> =
373                    post_pairs.into_iter().map(|(p, _)| p).collect();
374                let post_pipeline =
375                    super::pipeline_runtime::new_shared_pipeline(compose_pipeline(post_procs));
376
377                aggregate_split = Some(AggregateSplitInfo {
378                    pre_pipeline,
379                    agg_config,
380                    post_pipeline,
381                });
382
383                vec![]
384            }
385            None => self.resolve_steps(
386                steps,
387                &producer_ctx,
388                &self.registry,
389                Some(&route_id),
390                staging_mode,
391            )?,
392        };
393        let route_id_for_tracing = route_id.clone();
394        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
395
396        let mut pipeline = build_eh_config_pipeline(
397            eh_config.as_ref(),
398            Arc::clone(&self.registry),
399            Arc::clone(&self.languages),
400            self.tracer_metrics.clone(),
401            Arc::clone(&self.platform_service),
402            self.health_registry(),
403            &route_id_for_tracing,
404            &producer_ctx,
405            processors_with_contracts,
406            self.tracing_enabled,
407            self.tracer_detail_level.clone(),
408            security_policy.clone(),
409            circuit_breaker,
410        )?;
411
412        let uow_counter = if let Some(uow_config) = &unit_of_work {
413            let component_ctx = Arc::new(ControllerComponentContext::new(
414                Arc::clone(&self.registry),
415                Arc::clone(&self.languages),
416                self.tracer_metrics
417                    .clone()
418                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
419                Arc::clone(&self.platform_service),
420                self.health_registry(),
421                Some(route_id.clone()),
422            ));
423            let rt: Arc<dyn camel_component_api::RuntimeObservability> =
424                Arc::clone(&component_ctx) as Arc<_>;
425            let (uow_layer, counter) = super::route_compiler_ext::resolve_uow_layer(
426                uow_config,
427                &producer_ctx,
428                rt,
429                component_ctx.as_ref(),
430                None,
431            )?;
432            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
433            Some(counter)
434        } else {
435            None
436        };
437
438        Ok(PreparedRoute {
439            route_id,
440            managed: ManagedRoute {
441                definition: definition_info,
442                from_uri,
443                pipeline: super::pipeline_runtime::new_shared_pipeline(pipeline),
444                concurrency,
445                consumer_handle: None,
446                pipeline_handle: None,
447                consumer_cancel_token: CancellationToken::new(),
448                pipeline_cancel_token: CancellationToken::new(),
449                channel_sender: None,
450                in_flight: uow_counter,
451                aggregate_split,
452                agg_service: None,
453                compiled: route_runtime_state::CompiledRoute {
454                    security_policy,
455                    security_authenticator,
456                },
457            },
458        })
459    }
460
461    pub(crate) fn insert_prepared_route(
462        &mut self,
463        prepared: PreparedRoute,
464    ) -> Result<(), CamelError> {
465        if self.routes.contains_key(&prepared.route_id) {
466            return Err(CamelError::RouteError(format!(
467                "Route '{}' already exists",
468                prepared.route_id
469            )));
470        }
471        self.routes
472            .insert(prepared.route_id.clone(), prepared.managed);
473        Ok(())
474    }
475
476    pub async fn add_route_with_generation(
477        &mut self,
478        definition: RouteDefinition,
479        generation: u64,
480    ) -> Result<(), CamelError> {
481        let route_id = definition.route_id().to_string();
482
483        if self.routes.contains_key(&route_id) {
484            return Err(CamelError::RouteError(format!(
485                "Route '{}' already exists",
486                route_id
487            )));
488        }
489
490        info!(route_id = %route_id, generation, "Adding route to controller with generation");
491
492        let prepared = self.build_managed_route(
493            definition,
494            &super::step_resolution::FunctionStagingMode::HotReload { generation },
495        )?;
496
497        self.routes
498            .insert(prepared.route_id.clone(), prepared.managed);
499
500        Ok(())
501    }
502
503    pub(crate) fn prepare_route_definition_with_generation(
504        &self,
505        definition: RouteDefinition,
506        generation: u64,
507    ) -> Result<PreparedRoute, CamelError> {
508        self.build_managed_route(
509            definition,
510            &super::step_resolution::FunctionStagingMode::HotReload { generation },
511        )
512    }
513
514    pub async fn remove_route_preserving_functions(
515        &mut self,
516        route_id: &str,
517    ) -> Result<(), CamelError> {
518        let managed = self.routes.get(route_id).ok_or_else(|| {
519            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
520        })?;
521        if handle_is_running(&managed.consumer_handle)
522            || handle_is_running(&managed.pipeline_handle)
523        {
524            return Err(CamelError::RouteError(format!(
525                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
526                route_id,
527                inferred_lifecycle_label(managed)
528            )));
529        }
530        self.routes.remove(route_id);
531        if let Some(reg) = &self.health_registry {
532            reg.unregister_for_route(route_id);
533        }
534        info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
535        Ok(())
536    }
537
538    /// Compile a route definition into a processor pipeline, without adding it
539    /// to the controller. Used for validation and testing.
540    pub fn compile_route_definition(
541        &self,
542        def: RouteDefinition,
543    ) -> Result<BoxProcessor, CamelError> {
544        self.route_compiler_ext().compile_route_definition(def)
545    }
546
547    /// Compile a route definition with a specific generation (for hot-reload).
548    pub fn compile_route_definition_with_generation(
549        &self,
550        def: RouteDefinition,
551        generation: u64,
552    ) -> Result<BoxProcessor, CamelError> {
553        self.route_compiler_ext()
554            .compile_route_definition_with_generation(def, generation)
555    }
556
557    /// Remove a route from the controller map.
558    ///
559    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
560    /// Returns an error if the route is still running or does not exist.
561    /// Does not cancel any running tasks — call `stop_route` first.
562    pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
563        let managed = self.routes.get(route_id).ok_or_else(|| {
564            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
565        })?;
566        if handle_is_running(&managed.consumer_handle)
567            || handle_is_running(&managed.pipeline_handle)
568        {
569            return Err(CamelError::RouteError(format!(
570                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
571                route_id,
572                inferred_lifecycle_label(managed)
573            )));
574        }
575        if let Some(invoker) = &self.function_invoker {
576            for (id, rid) in self.collect_function_refs(route_id) {
577                if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
578                    warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
579                }
580            }
581        }
582        self.routes.remove(route_id);
583        if let Some(reg) = &self.health_registry {
584            reg.unregister_for_route(route_id);
585        }
586        info!(route_id = %route_id, "Route removed from controller");
587        Ok(())
588    }
589
590    fn collect_function_refs(
591        &self,
592        route_id: &str,
593    ) -> Vec<(camel_api::FunctionId, Option<String>)> {
594        self.function_invoker
595            .as_ref()
596            .map(|invoker| invoker.function_refs_for_route(route_id))
597            .unwrap_or_default()
598    }
599
600    fn discard_function_staging(&self) {
601        if let Some(invoker) = &self.function_invoker {
602            invoker.discard_staging(0);
603        }
604    }
605
606    /// Returns the number of routes in the controller.
607    pub fn route_count(&self) -> usize {
608        self.routes.route_count()
609    }
610
611    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
612        self.routes.in_flight_count(route_id)
613    }
614
615    /// Returns `true` if a route with the given ID exists.
616    pub fn route_exists(&self, route_id: &str) -> bool {
617        self.routes.route_exists(route_id)
618    }
619
620    /// Returns all route IDs.
621    pub fn route_ids(&self) -> Vec<String> {
622        self.routes.route_ids()
623    }
624
625    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
626        self.routes.route_source_hash(route_id)
627    }
628
629    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
630    pub fn auto_startup_route_ids(&self) -> Vec<String> {
631        self.routes.auto_startup_route_ids()
632    }
633
634    /// Returns route IDs sorted by shutdown order (startup order descending).
635    pub fn shutdown_route_ids(&self) -> Vec<String> {
636        self.routes.shutdown_route_ids()
637    }
638
639    /// Atomically swap the pipeline of a route.
640    ///
641    /// In-flight requests finish with the old pipeline (kept alive by Arc).
642    /// New requests immediately use the new pipeline.
643    pub fn swap_pipeline(
644        &self,
645        route_id: &str,
646        new_pipeline: BoxProcessor,
647    ) -> Result<(), CamelError> {
648        let managed = self
649            .routes
650            .get(route_id)
651            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
652
653        if managed.aggregate_split.is_some() {
654            tracing::warn!(
655                route_id = %route_id,
656                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
657            );
658        }
659
660        super::pipeline_runtime::swap_pipeline(&managed.pipeline, new_pipeline);
661        info!(route_id = %route_id, "Pipeline swapped atomically");
662        Ok(())
663    }
664
665    /// Returns the from_uri of a route, if it exists.
666    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
667        self.routes.route_from_uri(route_id)
668    }
669
670    /// Get a clone of the current pipeline for a route.
671    ///
672    /// This is useful for testing and introspection.
673    /// Returns `None` if the route doesn't exist.
674    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
675        self.routes.get_pipeline(route_id)
676    }
677
678    /// Internal stop implementation that can set custom status.
679    pub(super) async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
680        self.routes.stop_route(route_id).await
681    }
682
683    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
684        self.start_route(route_id).await
685    }
686
687    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
688        self.stop_route(route_id).await
689    }
690}
691
692// ── Aggregator route helpers ──
693
694impl DefaultRouteController {
695    /// Start a route with an aggregate split (pre-pipeline → aggregator → post-pipeline).
696    ///
697    /// Spawns a biased-select forward loop that routes exchanges through the
698    /// pre-pipeline, aggregator, and post-pipeline in sequence, with late-exchange
699    /// handling and force-completion on stop.
700    #[allow(clippy::too_many_arguments)]
701    pub(super) async fn start_aggregate_route(
702        &mut self,
703        route_id: &str,
704        split: AggregateSplitInfo,
705        consumer: Box<dyn Consumer>,
706        consumer_ctx: ConsumerContext,
707        mut rx: mpsc::Receiver<ExchangeEnvelope>,
708        crash_notifier: Option<mpsc::Sender<CrashNotification>>,
709        runtime_for_consumer: Option<Weak<dyn RuntimeHandle>>,
710        tx_for_storage: mpsc::Sender<ExchangeEnvelope>,
711        // Pipeline cancellation — a child of the managed route's pipeline_cancel_token.
712        pipeline_cancel: CancellationToken,
713    ) -> Result<(), CamelError> {
714        let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
715
716        let route_cancel_clone = pipeline_cancel.clone();
717        let svc = AggregatorService::new(
718            split.agg_config.clone(),
719            late_tx,
720            Arc::clone(&self.languages),
721            route_cancel_clone,
722        );
723        let agg = Arc::new(std::sync::Mutex::new(svc));
724
725        let pipeline_cancel_for_monitor = pipeline_cancel.clone();
726        let agg_for_monitor = Arc::clone(&agg);
727
728        {
729            let managed = self
730                .routes
731                .get_mut(route_id)
732                .expect("invariant: route must exist"); // allow-unwrap
733            managed.agg_service = Some(Arc::clone(&agg));
734        }
735
736        let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
737        let pre_pipeline = split.pre_pipeline;
738        let post_pipeline = split.post_pipeline;
739
740        // Spawn biased select forward loop
741        let pipeline_handle = tokio::spawn(async move {
742            loop {
743                tokio::select! {
744                    biased;
745
746                    late_ex = async {
747                        let mut rx = late_rx.lock().await;
748                        rx.recv().await
749                    } => {
750                        match late_ex {
751                            Some(ex) => {
752                                let pipe = post_pipeline.load();
753                                if let Err(e) = pipe.clone_inner().oneshot(ex).await {
754                                    tracing::warn!(error = %e, "late exchange post-pipeline failed");
755                                }
756                            }
757                            None => return,
758                        }
759                    }
760
761                    envelope_opt = rx.recv() => {
762                        match envelope_opt {
763                            Some(envelope) => {
764                                let ExchangeEnvelope { exchange, reply_tx } = envelope;
765                                let pre_pipe = pre_pipeline.load();
766                                let ex = match pre_pipe.clone_inner().oneshot(exchange).await {
767                                    Ok(ex) => ex,
768                                    Err(e) => {
769                                        if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
770                                        continue;
771                                    }
772                                };
773
774                                let ex = {
775                                    let cloned_svc = agg
776                                        .lock()
777                                        .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
778                                        .clone();
779                                    cloned_svc.oneshot(ex).await
780                                };
781
782                                match ex {
783                                    Ok(ex) => {
784                                        if !is_pending(&ex) {
785                                            let post_pipe = post_pipeline.load();
786                                            let out = post_pipe.clone_inner().oneshot(ex).await;
787                                            if let Some(tx) = reply_tx { let _ = tx.send(out); }
788                                        } else if let Some(tx) = reply_tx {
789                                            let _ = tx.send(Ok(ex));
790                                        }
791                                    }
792                                    Err(e) => {
793                                        if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
794                                    }
795                                }
796                            }
797                            None => return,
798                        }
799                    }
800
801                    _ = pipeline_cancel.cancelled() => {
802                        {
803                            let guard = agg
804                                .lock()
805                                .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
806                            guard.force_complete_all();
807                        }
808                        let mut rx_guard = late_rx.lock().await;
809                        while let Ok(late_ex) = rx_guard.try_recv() {
810                            let pipe = post_pipeline.load();
811                            let _ = pipe.clone_inner().oneshot(late_ex).await;
812                        }
813                        break;
814                    }
815                }
816            }
817        });
818        #[cfg(test)]
819        emit_start_route_event("pipeline_spawned");
820
821        // Start consumer after pipeline loop is spawned to avoid startup races
822        // where consumers emit exchanges before the route pipeline begins polling.
823        let consumer_handle = super::consumer_management::spawn_consumer_task(
824            route_id.to_string(),
825            consumer,
826            consumer_ctx,
827            crash_notifier,
828            runtime_for_consumer,
829            false,
830        );
831
832        // Extend the stored consumer handle through aggregate force-completion.
833        // While this monitor drains pending buckets, handle_is_running still reports
834        // the Route as running because forced exchanges may still be in post-pipeline.
835        let force_on_stop = agg_for_monitor
836            .lock()
837            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
838            .config()
839            .force_completion_on_stop;
840        let consumer_handle = tokio::spawn(async move {
841            let _ = consumer_handle.await;
842            if !pipeline_cancel_for_monitor.is_cancelled() {
843                let guard = agg_for_monitor
844                    .lock()
845                    .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
846                guard.force_complete_all();
847                drop(guard);
848                if force_on_stop {
849                    pipeline_cancel_for_monitor.cancel();
850                }
851            }
852        });
853        #[cfg(test)]
854        emit_start_route_event("consumer_spawned");
855
856        {
857            let managed = self
858                .routes
859                .get_mut(route_id)
860                .expect("invariant: route must exist"); // allow-unwrap
861            managed.consumer_handle = Some(consumer_handle);
862            managed.pipeline_handle = Some(pipeline_handle);
863            managed.channel_sender = Some(tx_for_storage);
864        }
865
866        info!(route_id = %route_id, "Route started (aggregate with timeout)");
867        Ok(())
868    }
869}
870
871#[cfg(test)]
872#[path = "route_controller_tests.rs"]
873mod tests;