Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, ServiceExt};
13use tracing::{info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::metrics::MetricsCollector;
17#[allow(unused_imports)]
18use camel_api::{
19    BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
20    NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeHandle,
21    StepLifecycle,
22};
23use camel_component_api::{Consumer, ConsumerContext, consumer::ExchangeEnvelope};
24use camel_processor::aggregator::AggregatorService;
25pub use camel_processor::aggregator::SharedLanguageRegistry;
26
27use crate::health_registry::HealthCheckRegistry;
28use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
29use crate::lifecycle::adapters::route_compiler_ext::{RouteCompilerExt, build_eh_config_pipeline};
30use crate::lifecycle::adapters::route_helpers::{
31    AggregateSplitInfo, CrashNotification, ManagedRoute, assert_no_mixed_top_level_splits,
32    handle_is_running, inferred_lifecycle_label, is_pending,
33};
34pub(crate) use crate::lifecycle::adapters::route_helpers::{CompiledPipeline, PreparedRoute};
35#[cfg(test)]
36pub(super) use crate::lifecycle::adapters::route_helpers::{
37    emit_start_route_event, set_start_route_event_hook,
38};
39use crate::lifecycle::adapters::route_registry::RouteRegistry;
40use crate::lifecycle::adapters::route_runtime_state;
41use crate::lifecycle::adapters::step_compilers::CompiledStep;
42use crate::lifecycle::application::route_definition::{BuilderStep, RouteDefinition};
43use crate::shared::components::domain::Registry;
44use crate::shared::observability::domain::{DetailLevel, TracerConfig};
45use camel_bean::BeanRegistry;
46
47/// Default implementation of [`RouteController`].
48///
49/// Manages route lifecycle with support for:
50/// - Starting/stopping individual routes
51/// - Suspending and resuming routes
52/// - Auto-startup with startup ordering
53/// - Graceful shutdown
54pub struct DefaultRouteController {
55    /// Routes indexed by route ID.
56    pub(super) routes: RouteRegistry,
57    /// Reference to the component registry for resolving endpoints.
58    pub(super) registry: Arc<std::sync::Mutex<Registry>>,
59    /// Shared language registry for resolving declarative language expressions.
60    pub(super) languages: SharedLanguageRegistry,
61    /// Bean registry for bean method invocation.
62    pub(super) beans: Arc<std::sync::Mutex<BeanRegistry>>,
63    /// Runtime handle injected into ProducerContext for command/query operations.
64    pub(super) runtime: Option<Weak<dyn RuntimeHandle>>,
65    /// Optional global error handler applied to all routes without a per-route handler.
66    pub(super) global_error_handler: Option<ErrorHandlerConfig>,
67    /// Optional crash notifier for supervision.
68    pub(super) crash_notifier: Option<mpsc::Sender<CrashNotification>>,
69    /// Whether tracing is enabled for route pipelines.
70    pub(super) tracing_enabled: bool,
71    /// Detail level for tracing when enabled.
72    pub(super) tracer_detail_level: DetailLevel,
73    /// Metrics collector for tracing processor.
74    pub(super) tracer_metrics: Option<Arc<dyn MetricsCollector>>,
75    pub(super) platform_service: Arc<dyn PlatformService>,
76    pub(super) function_invoker: Option<Arc<dyn FunctionInvoker>>,
77    pub(super) health_registry: Option<Arc<HealthCheckRegistry>>,
78    /// Shared idempotent repository registry. Defaults to an empty registry;
79    /// the CamelContext builder installs a populated handle that includes the
80    /// built-in `"memory"` repository.
81    pub(super) idempotent_repositories: crate::SharedIdempotentRegistry,
82    pub(super) claim_check_repositories: crate::SharedClaimCheckRegistry,
83}
84
85impl DefaultRouteController {
86    pub(super) fn health_registry(&self) -> Arc<HealthCheckRegistry> {
87        self.health_registry.clone().unwrap_or_else(|| {
88            warn!("health_registry not configured — creating isolated fallback");
89            Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
90        })
91    }
92
93    /// Create a new `DefaultRouteController` with the given registry.
94    pub fn new(
95        registry: Arc<std::sync::Mutex<Registry>>,
96        platform_service: Arc<dyn PlatformService>,
97    ) -> Self {
98        Self::with_beans_and_platform_service(
99            registry,
100            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
101            platform_service,
102        )
103    }
104
105    /// Create a new `DefaultRouteController` with shared bean registry.
106    pub fn with_beans(
107        registry: Arc<std::sync::Mutex<Registry>>,
108        beans: Arc<std::sync::Mutex<BeanRegistry>>,
109    ) -> Self {
110        Self::with_beans_and_platform_service(
111            registry,
112            beans,
113            Arc::new(NoopPlatformService::default()),
114        )
115    }
116
117    fn with_beans_and_platform_service(
118        registry: Arc<std::sync::Mutex<Registry>>,
119        beans: Arc<std::sync::Mutex<BeanRegistry>>,
120        platform_service: Arc<dyn PlatformService>,
121    ) -> Self {
122        Self {
123            routes: RouteRegistry::new(),
124            registry,
125            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
126            beans,
127            runtime: None,
128            global_error_handler: None,
129            crash_notifier: None,
130            tracing_enabled: false,
131            tracer_detail_level: DetailLevel::Minimal,
132            tracer_metrics: None,
133            platform_service,
134            function_invoker: None,
135            health_registry: None,
136            idempotent_repositories: Arc::new(crate::IdempotentRegistry::new()),
137            claim_check_repositories: Arc::new(crate::ClaimCheckRegistry::new()),
138        }
139    }
140
141    /// Create a new `DefaultRouteController` with shared language registry.
142    pub fn with_languages(
143        registry: Arc<std::sync::Mutex<Registry>>,
144        languages: SharedLanguageRegistry,
145        platform_service: Arc<dyn PlatformService>,
146    ) -> Self {
147        Self {
148            routes: RouteRegistry::new(),
149            registry,
150            languages,
151            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
152            runtime: None,
153            global_error_handler: None,
154            crash_notifier: None,
155            tracing_enabled: false,
156            tracer_detail_level: DetailLevel::Minimal,
157            tracer_metrics: None,
158            platform_service,
159            function_invoker: None,
160            health_registry: None,
161            idempotent_repositories: Arc::new(crate::IdempotentRegistry::new()),
162            claim_check_repositories: Arc::new(crate::ClaimCheckRegistry::new()),
163        }
164    }
165
166    pub fn with_languages_and_beans(
167        registry: Arc<std::sync::Mutex<Registry>>,
168        languages: SharedLanguageRegistry,
169        platform_service: Arc<dyn PlatformService>,
170        beans: Arc<std::sync::Mutex<BeanRegistry>>,
171    ) -> Self {
172        Self {
173            routes: RouteRegistry::new(),
174            registry,
175            languages,
176            beans,
177            runtime: None,
178            global_error_handler: None,
179            crash_notifier: None,
180            tracing_enabled: false,
181            tracer_detail_level: DetailLevel::Minimal,
182            tracer_metrics: None,
183            platform_service,
184            function_invoker: None,
185            health_registry: None,
186            idempotent_repositories: Arc::new(crate::IdempotentRegistry::new()),
187            claim_check_repositories: Arc::new(crate::ClaimCheckRegistry::new()),
188        }
189    }
190
191    pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
192        self.function_invoker = Some(function_invoker);
193        self
194    }
195
196    pub(crate) fn set_idempotent_repositories(
197        &mut self,
198        repositories: crate::SharedIdempotentRegistry,
199    ) {
200        self.idempotent_repositories = repositories;
201    }
202
203    pub(crate) fn set_claim_check_repositories(
204        &mut self,
205        repositories: crate::SharedClaimCheckRegistry,
206    ) {
207        self.claim_check_repositories = repositories;
208    }
209
210    pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
211        self.health_registry = Some(registry);
212    }
213
214    pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
215        self.function_invoker = Some(invoker);
216    }
217
218    /// Set runtime handle for ProducerContext creation.
219    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
220        self.runtime = Some(Arc::downgrade(&runtime));
221    }
222
223    /// Set the crash notifier for supervision.
224    ///
225    /// When set, the controller will send a [`CrashNotification`] whenever
226    /// a consumer crashes.
227    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
228        self.crash_notifier = Some(tx);
229    }
230
231    /// Set a global error handler applied to all routes without a per-route handler.
232    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
233        self.global_error_handler = Some(config);
234    }
235
236    /// Configure tracing for this route controller.
237    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
238        self.tracing_enabled = config.enabled;
239        self.tracer_detail_level = config.detail_level.clone();
240        self.tracer_metrics = config.metrics_collector.clone();
241    }
242
243    fn build_producer_context(&self, route_id: &str) -> Result<ProducerContext, CamelError> {
244        let mut producer_ctx = ProducerContext::new().with_route_id(route_id);
245        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
246            producer_ctx = producer_ctx.with_runtime(runtime);
247        }
248        Ok(producer_ctx)
249    }
250
251    /// Create a transient [`RouteCompilerExt`] from this controller's fields.
252    fn route_compiler_ext(&self) -> RouteCompilerExt<'_> {
253        RouteCompilerExt {
254            registry: &self.registry,
255            languages: &self.languages,
256            beans: &self.beans,
257            function_invoker: &self.function_invoker,
258            tracing_enabled: self.tracing_enabled,
259            tracer_detail_level: &self.tracer_detail_level,
260            tracer_metrics: &self.tracer_metrics,
261            platform_service: &self.platform_service,
262            runtime: &self.runtime,
263            global_error_handler: &self.global_error_handler,
264            health_registry: &self.health_registry,
265            route_registry: &self.routes,
266            idempotent_repositories: Arc::clone(&self.idempotent_repositories),
267            claim_check_repositories: Arc::clone(&self.claim_check_repositories),
268        }
269    }
270
271    /// Resolve BuilderSteps into BoxProcessors.
272    #[allow(dead_code)] // used by tests and may be needed for future split paths
273    pub(crate) fn resolve_steps(
274        &self,
275        steps: Vec<BuilderStep>,
276        producer_ctx: &ProducerContext,
277        registry: &Arc<std::sync::Mutex<Registry>>,
278        route_id: Option<&str>,
279        staging_mode: &super::step_resolution::FunctionStagingMode,
280    ) -> Result<Vec<CompiledStep>, CamelError> {
281        let component_ctx = Arc::new(ControllerComponentContext::new(
282            Arc::clone(registry),
283            Arc::clone(&self.languages),
284            self.tracer_metrics
285                .clone()
286                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
287            Arc::clone(&self.platform_service),
288            self.health_registry(),
289            route_id.map(|s| s.to_string()),
290        ));
291        let rt: Arc<dyn camel_component_api::RuntimeObservability> =
292            Arc::clone(&component_ctx) as Arc<_>;
293
294        super::step_resolution::resolve_steps(
295            steps,
296            producer_ctx,
297            rt,
298            registry,
299            &self.languages,
300            &self.beans,
301            self.function_invoker.clone(),
302            component_ctx,
303            route_id,
304            staging_mode,
305            &self.idempotent_repositories,
306            &self.claim_check_repositories,
307        )
308    }
309
310    /// Add a route definition to the controller.
311    ///
312    /// Steps are resolved immediately using the registry.
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if:
317    /// - A route with the same ID already exists
318    /// - Step resolution fails
319    pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
320        let route_id = definition.route_id().to_string();
321
322        if self.routes.contains_key(&route_id) {
323            return Err(CamelError::RouteError(format!(
324                "Route '{}' already exists",
325                route_id
326            )));
327        }
328
329        info!(route_id = %route_id, "Adding route to controller");
330
331        let prepared = match self.build_managed_route(
332            definition,
333            &super::step_resolution::FunctionStagingMode::DirectAdd,
334        ) {
335            Ok(prepared) => prepared,
336            Err(err) => {
337                self.discard_function_staging();
338                return Err(err);
339            }
340        };
341
342        if let Some(invoker) = &self.function_invoker
343            && let Err(err) = invoker.commit_staged().await
344        {
345            invoker.discard_staging(0);
346            return Err(CamelError::Config(err.to_string()));
347        }
348
349        self.routes
350            .insert(prepared.route_id.clone(), prepared.managed);
351
352        Ok(())
353    }
354
355    fn build_managed_route(
356        &self,
357        definition: RouteDefinition,
358        staging_mode: &super::step_resolution::FunctionStagingMode,
359    ) -> Result<PreparedRoute, CamelError> {
360        let route_id = definition.route_id().to_string();
361
362        let definition_info = definition.to_info();
363        let RouteDefinition {
364            from_uri,
365            steps,
366            error_handler,
367            circuit_breaker,
368            security_policy,
369            security_authenticator,
370            unit_of_work,
371            concurrency,
372            ..
373        } = definition;
374
375        let producer_ctx = self.build_producer_context(&route_id)?;
376
377        // N2: reject mixed Aggregate + Resequence top-level splits
378        assert_no_mixed_top_level_splits(&steps)?;
379
380        let (aggregate_split, processors_with_contracts) = self
381            .route_compiler_ext()
382            .detect_and_validate_route_split(steps, &producer_ctx, &route_id, staging_mode)?;
383        let lifecycle = super::route_helpers::collect_lifecycle(&processors_with_contracts);
384        let route_id_for_tracing = route_id.clone();
385        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
386
387        let mut pipeline = build_eh_config_pipeline(
388            eh_config.as_ref(),
389            Arc::clone(&self.registry),
390            Arc::clone(&self.languages),
391            self.tracer_metrics.clone(),
392            Arc::clone(&self.platform_service),
393            self.health_registry(),
394            &route_id_for_tracing,
395            &producer_ctx,
396            processors_with_contracts,
397            self.tracing_enabled,
398            self.tracer_detail_level.clone(),
399            security_policy.clone(),
400            circuit_breaker,
401        )?;
402
403        let uow_counter = if let Some(uow_config) = &unit_of_work {
404            let component_ctx = Arc::new(ControllerComponentContext::new(
405                Arc::clone(&self.registry),
406                Arc::clone(&self.languages),
407                self.tracer_metrics
408                    .clone()
409                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
410                Arc::clone(&self.platform_service),
411                self.health_registry(),
412                Some(route_id.clone()),
413            ));
414            let rt: Arc<dyn camel_component_api::RuntimeObservability> =
415                Arc::clone(&component_ctx) as Arc<_>;
416            let (uow_layer, counter) = super::route_compiler_ext::resolve_uow_layer(
417                uow_config,
418                &producer_ctx,
419                rt,
420                component_ctx.as_ref(),
421                None,
422            )?;
423            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
424            Some(counter)
425        } else {
426            None
427        };
428
429        Ok(PreparedRoute {
430            route_id,
431            managed: ManagedRoute {
432                definition: definition_info,
433                from_uri,
434                pipeline: super::pipeline_runtime::new_shared_pipeline_with_lifecycle(
435                    pipeline, lifecycle,
436                ),
437                concurrency,
438                consumer_handle: None,
439                pipeline_handle: None,
440                consumer_cancel_token: CancellationToken::new(),
441                pipeline_cancel_token: CancellationToken::new(),
442                channel_sender: None,
443                in_flight: uow_counter,
444                aggregate_split,
445                agg_service: None,
446                compiled: route_runtime_state::CompiledRoute {
447                    security_policy,
448                    security_authenticator,
449                },
450            },
451        })
452    }
453
454    pub(crate) fn insert_prepared_route(
455        &mut self,
456        prepared: PreparedRoute,
457    ) -> Result<(), CamelError> {
458        if self.routes.contains_key(&prepared.route_id) {
459            return Err(CamelError::RouteError(format!(
460                "Route '{}' already exists",
461                prepared.route_id
462            )));
463        }
464        self.routes
465            .insert(prepared.route_id.clone(), prepared.managed);
466        Ok(())
467    }
468
469    pub async fn add_route_with_generation(
470        &mut self,
471        definition: RouteDefinition,
472        generation: u64,
473    ) -> Result<(), CamelError> {
474        let route_id = definition.route_id().to_string();
475
476        if self.routes.contains_key(&route_id) {
477            return Err(CamelError::RouteError(format!(
478                "Route '{}' already exists",
479                route_id
480            )));
481        }
482
483        info!(route_id = %route_id, generation, "Adding route to controller with generation");
484
485        let prepared = self.build_managed_route(
486            definition,
487            &super::step_resolution::FunctionStagingMode::HotReload { generation },
488        )?;
489
490        self.routes
491            .insert(prepared.route_id.clone(), prepared.managed);
492
493        Ok(())
494    }
495
496    pub(crate) fn prepare_route_definition_with_generation(
497        &self,
498        definition: RouteDefinition,
499        generation: u64,
500    ) -> Result<PreparedRoute, CamelError> {
501        self.build_managed_route(
502            definition,
503            &super::step_resolution::FunctionStagingMode::HotReload { generation },
504        )
505    }
506
507    pub async fn remove_route_preserving_functions(
508        &mut self,
509        route_id: &str,
510    ) -> Result<(), CamelError> {
511        let managed = self.routes.get(route_id).ok_or_else(|| {
512            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
513        })?;
514        if handle_is_running(&managed.consumer_handle)
515            || handle_is_running(&managed.pipeline_handle)
516        {
517            return Err(CamelError::RouteError(format!(
518                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
519                route_id,
520                inferred_lifecycle_label(managed)
521            )));
522        }
523        self.routes.remove(route_id);
524        if let Some(reg) = &self.health_registry {
525            reg.unregister_for_route(route_id);
526        }
527        info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
528        Ok(())
529    }
530
531    /// Compile a route definition into a processor pipeline, without adding it
532    /// to the controller. Used for validation and testing.
533    pub fn compile_route_definition(
534        &self,
535        def: RouteDefinition,
536    ) -> Result<BoxProcessor, CamelError> {
537        self.route_compiler_ext().compile_route_definition(def)
538    }
539
540    /// Compile a route definition with a specific generation (for hot-reload).
541    pub fn compile_route_definition_with_generation(
542        &self,
543        def: RouteDefinition,
544        generation: u64,
545    ) -> Result<BoxProcessor, CamelError> {
546        self.route_compiler_ext()
547            .compile_route_definition_with_generation(def, generation)
548    }
549
550    /// Compile a route definition into a [`CompiledPipeline`] (processor +
551    /// lifecycle handles). Used by the hot-reload Restart path so that
552    /// lifecycle handles are threaded through
553    /// [`swap_pipeline_raw`](Self::swap_pipeline_raw).
554    pub(crate) fn compile_route_definition_pipeline(
555        &self,
556        def: RouteDefinition,
557        generation: u64,
558    ) -> Result<CompiledPipeline, CamelError> {
559        self.route_compiler_ext()
560            .compile_route_definition_pipeline(def, generation)
561    }
562
563    /// Compile without function generation, returning full [`CompiledPipeline`].
564    ///
565    /// Oracle Fix 1: used by the stateless hot-reload path so that
566    /// lifecycle-bearing routes have their handles preserved.
567    pub(crate) fn compile_route_definition_dry_pipeline(
568        &self,
569        def: RouteDefinition,
570    ) -> Result<CompiledPipeline, CamelError> {
571        self.route_compiler_ext()
572            .compile_route_definition_dry_pipeline(def)
573    }
574
575    /// Remove a route from the controller map.
576    ///
577    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
578    /// Returns an error if the route is still running or does not exist.
579    /// Does not cancel any running tasks — call `stop_route` first.
580    pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
581        let managed = self.routes.get(route_id).ok_or_else(|| {
582            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
583        })?;
584        if handle_is_running(&managed.consumer_handle)
585            || handle_is_running(&managed.pipeline_handle)
586        {
587            return Err(CamelError::RouteError(format!(
588                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
589                route_id,
590                inferred_lifecycle_label(managed)
591            )));
592        }
593        if let Some(invoker) = &self.function_invoker {
594            for (id, rid) in self.collect_function_refs(route_id) {
595                if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
596                    warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
597                }
598            }
599        }
600        self.routes.remove(route_id);
601        if let Some(reg) = &self.health_registry {
602            reg.unregister_for_route(route_id);
603        }
604        info!(route_id = %route_id, "Route removed from controller");
605        Ok(())
606    }
607
608    fn collect_function_refs(
609        &self,
610        route_id: &str,
611    ) -> Vec<(camel_api::FunctionId, Option<String>)> {
612        self.function_invoker
613            .as_ref()
614            .map(|invoker| invoker.function_refs_for_route(route_id))
615            .unwrap_or_default()
616    }
617
618    fn discard_function_staging(&self) {
619        if let Some(invoker) = &self.function_invoker {
620            invoker.discard_staging(0);
621        }
622    }
623
624    /// Returns the number of routes in the controller.
625    pub fn route_count(&self) -> usize {
626        self.routes.route_count()
627    }
628
629    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
630        self.routes.in_flight_count(route_id)
631    }
632
633    /// Returns `true` if a route with the given ID exists.
634    pub fn route_exists(&self, route_id: &str) -> bool {
635        self.routes.route_exists(route_id)
636    }
637
638    /// Returns all route IDs.
639    pub fn route_ids(&self) -> Vec<String> {
640        self.routes.route_ids()
641    }
642
643    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
644        self.routes.route_source_hash(route_id)
645    }
646
647    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
648    pub fn auto_startup_route_ids(&self) -> Vec<String> {
649        self.routes.auto_startup_route_ids()
650    }
651
652    /// Returns route IDs sorted by shutdown order (startup order descending).
653    pub fn shutdown_route_ids(&self) -> Vec<String> {
654        self.routes.shutdown_route_ids()
655    }
656
657    /// Atomically swap the pipeline of a route (zero-downtime).
658    ///
659    /// In-flight requests finish with the old pipeline (kept alive by Arc).
660    /// New requests immediately use the new pipeline.
661    ///
662    /// ## Rejection policy
663    ///
664    /// Returns an error if the route has lifecycle-bearing steps or an active
665    /// aggregate — these require the **Restart path** (stop → swap → start).
666    ///
667    /// The caller (e.g. `reload_actions::apply_swap`) MUST catch this rejection
668    /// and fall back to:
669    /// 1. `stop_route_reload` — drain lifecycle, stop consumer
670    /// 2. `swap_pipeline_raw` — bypass the lifecycle check (route is stopped)
671    /// 3. `start_route_reload` — re-create consumer with the new pipeline
672    ///
673    /// This is the "reject, don't defer" policy (oracle Fix 3): the swap is
674    /// refused upfront rather than silently deferring or partially swapping.
675    pub fn swap_pipeline(
676        &self,
677        route_id: &str,
678        new_pipeline: BoxProcessor,
679    ) -> Result<(), CamelError> {
680        let managed = self
681            .routes
682            .get(route_id)
683            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
684
685        let assembly = managed.pipeline.load();
686        let has_lifecycle = !assembly.lifecycle.is_empty();
687
688        if has_lifecycle || managed.agg_service.is_some() {
689            warn!(
690                route_id = %route_id,
691                "Hot-swap rejected — route has lifecycle/agg steps; use Restart path"
692            );
693            return Err(CamelError::RouteError(format!(
694                "Route '{}' contains stateful steps (lifecycle-bearing). Hot-swap not supported — use restart.",
695                route_id
696            )));
697        }
698
699        drop(assembly);
700
701        if managed.aggregate_split.is_some() {
702            warn!(
703                route_id = %route_id,
704                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
705            );
706        }
707
708        super::pipeline_runtime::swap_pipeline_raw(&managed.pipeline, new_pipeline, vec![]);
709        info!(route_id = %route_id, "Pipeline swapped atomically");
710        Ok(())
711    }
712
713    /// Non-checking raw pipeline swap — bypasses lifecycle/aggregate rejection.
714    ///
715    /// Only for use after the route has been stopped (Restart path).
716    /// Does NOT check for lifecycle handles or aggregate service — the caller
717    /// is responsible for ensuring the route is safe to swap.
718    ///
719    /// Accepts `lifecycle` so that the new pipeline assembly records the
720    /// lifecycle handles from the compiled steps.  When the route is
721    /// subsequently stopped, these handles are drained.
722    pub(crate) fn swap_pipeline_raw(
723        &self,
724        route_id: &str,
725        new_pipeline: BoxProcessor,
726        lifecycle: Vec<Arc<dyn StepLifecycle>>,
727    ) -> Result<(), CamelError> {
728        let managed = self
729            .routes
730            .get(route_id)
731            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
732        super::pipeline_runtime::swap_pipeline_raw(&managed.pipeline, new_pipeline, lifecycle);
733        info!(route_id = %route_id, "Pipeline swapped (raw — lifecycle bypass)");
734        Ok(())
735    }
736
737    /// Returns the from_uri of a route, if it exists.
738    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
739        self.routes.route_from_uri(route_id)
740    }
741
742    /// Get a clone of the current pipeline for a route.
743    ///
744    /// This is useful for testing and introspection.
745    /// Returns `None` if the route doesn't exist.
746    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
747        self.routes.get_pipeline(route_id)
748    }
749
750    /// Check whether the running route has lifecycle-bearing steps.
751    ///
752    /// Returns `false` when the route is missing.
753    pub(crate) fn route_has_lifecycle(&self, route_id: &str) -> bool {
754        self.routes
755            .get(route_id)
756            .map(|managed| !managed.pipeline.load().lifecycle.is_empty())
757            .unwrap_or(false)
758    }
759
760    /// Internal stop implementation that can set custom status.
761    pub(super) async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
762        self.routes.stop_route(route_id).await
763    }
764
765    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
766        self.start_route(route_id).await
767    }
768
769    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
770        self.stop_route(route_id).await
771    }
772}
773
774// ── Aggregator route helpers ──
775
776impl DefaultRouteController {
777    /// Start a route with an aggregate split (pre-pipeline → aggregator → post-pipeline).
778    ///
779    /// Spawns a biased-select forward loop that routes exchanges through the
780    /// pre-pipeline, aggregator, and post-pipeline in sequence, with late-exchange
781    /// handling and force-completion on stop.
782    #[allow(clippy::too_many_arguments)]
783    pub(super) async fn start_aggregate_route(
784        &mut self,
785        route_id: &str,
786        split: AggregateSplitInfo,
787        consumer: Box<dyn Consumer>,
788        consumer_ctx: ConsumerContext,
789        mut rx: mpsc::Receiver<ExchangeEnvelope>,
790        crash_notifier: Option<mpsc::Sender<CrashNotification>>,
791        runtime_for_consumer: Option<Weak<dyn RuntimeHandle>>,
792        tx_for_storage: mpsc::Sender<ExchangeEnvelope>,
793        // Pipeline cancellation — a child of the managed route's pipeline_cancel_token.
794        pipeline_cancel: CancellationToken,
795    ) -> Result<(), CamelError> {
796        let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
797
798        let route_cancel_clone = pipeline_cancel.clone();
799        let svc = AggregatorService::new(
800            split.agg_config.clone(),
801            late_tx,
802            Arc::clone(&self.languages),
803            route_cancel_clone,
804        );
805        let agg = Arc::new(svc);
806
807        let pipeline_cancel_for_monitor = pipeline_cancel.clone();
808        let agg_for_monitor = Arc::clone(&agg);
809
810        {
811            let managed = self
812                .routes
813                .get_mut(route_id)
814                .expect("invariant: route must exist"); // allow-unwrap
815            managed.agg_service = Some(Arc::clone(&agg));
816        }
817
818        let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
819        let pre_pipeline = split.pre_pipeline;
820        let post_pipeline = split.post_pipeline;
821
822        // Spawn biased select forward loop
823        let pipeline_handle = tokio::spawn(async move {
824            loop {
825                tokio::select! {
826                    biased;
827
828                    late_ex = async {
829                        let mut rx = late_rx.lock().await;
830                        rx.recv().await
831                    } => {
832                        match late_ex {
833                            Some(ex) => {
834                                let pipe = post_pipeline.load();
835                                if let Err(e) = pipe.processor.clone_inner().oneshot(ex).await {
836                                    tracing::warn!(error = %e, "late exchange post-pipeline failed");
837                                }
838                            }
839                            None => return,
840                        }
841                    }
842
843                    envelope_opt = rx.recv() => {
844                        match envelope_opt {
845                            Some(envelope) => {
846                                let ExchangeEnvelope { exchange, reply_tx } = envelope;
847                                let pre_pipe = pre_pipeline.load();
848                                let ex = match pre_pipe.processor.clone_inner().oneshot(exchange).await {
849                                    Ok(ex) => ex,
850                                    Err(e) => {
851                                        if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
852                                        continue;
853                                    }
854                                };
855
856                                let ex = {
857                                    let cloned_svc = agg.as_ref().clone();
858                                    cloned_svc.oneshot(ex).await
859                                };
860
861                                match ex {
862                                    Ok(ex) => {
863                                        if !is_pending(&ex) {
864                                            let post_pipe = post_pipeline.load();
865                                            let out = post_pipe.processor.clone_inner().oneshot(ex).await;
866                                            if let Some(tx) = reply_tx { let _ = tx.send(out); }
867                                        } else if let Some(tx) = reply_tx {
868                                            let _ = tx.send(Ok(ex));
869                                        }
870                                    }
871                                    Err(e) => {
872                                        if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
873                                    }
874                                }
875                            }
876                            None => return,
877                        }
878                    }
879
880                    _ = pipeline_cancel.cancelled() => {
881                        agg.force_complete_all();
882                        let mut rx_guard = late_rx.lock().await;
883                        while let Ok(late_ex) = rx_guard.try_recv() {
884                            let pipe = post_pipeline.load();
885                            let _ = pipe.processor.clone_inner().oneshot(late_ex).await;
886                        }
887                        break;
888                    }
889                }
890            }
891        });
892        #[cfg(test)]
893        emit_start_route_event("pipeline_spawned");
894
895        // Start consumer after pipeline loop is spawned to avoid startup races
896        // where consumers emit exchanges before the route pipeline begins polling.
897        let consumer_handle = super::consumer_management::spawn_consumer_task(
898            route_id.to_string(),
899            consumer,
900            consumer_ctx,
901            crash_notifier,
902            runtime_for_consumer,
903            false,
904        );
905
906        // Extend the stored consumer handle through aggregate force-completion.
907        // While this monitor drains pending buckets, handle_is_running still reports
908        // the Route as running because forced exchanges may still be in post-pipeline.
909        let force_on_stop = agg_for_monitor.config().force_completion_on_stop;
910        let consumer_handle = tokio::spawn(async move {
911            let _ = consumer_handle.await;
912            if !pipeline_cancel_for_monitor.is_cancelled() {
913                agg_for_monitor.force_complete_all();
914                if force_on_stop {
915                    pipeline_cancel_for_monitor.cancel();
916                }
917            }
918        });
919        #[cfg(test)]
920        emit_start_route_event("consumer_spawned");
921
922        {
923            let managed = self
924                .routes
925                .get_mut(route_id)
926                .expect("invariant: route must exist"); // allow-unwrap
927            managed.consumer_handle = Some(consumer_handle);
928            managed.pipeline_handle = Some(pipeline_handle);
929            managed.channel_sender = Some(tx_for_storage);
930        }
931
932        info!(route_id = %route_id, "Route started (aggregate with timeout)");
933        Ok(())
934    }
935
936    /// Test-only: inject lifecycle handles into an existing route's pipeline
937    /// assembly.  This makes the route lifecycle-bearing so that swap_pipeline
938    /// rejects it, forcing callers (like reload_actions::apply_swap) to take
939    /// the Restart path instead.
940    #[cfg(test)]
941    pub(crate) fn set_route_lifecycle_for_test(
942        &mut self,
943        route_id: &str,
944        lifecycle: Vec<Arc<dyn StepLifecycle>>,
945    ) -> Result<(), CamelError> {
946        use super::pipeline_runtime::PipelineAssembly;
947        use camel_api::SyncBoxProcessor;
948        use std::sync::Arc;
949
950        let managed = self
951            .routes
952            .get_mut(route_id)
953            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
954        let old_processor = managed.pipeline.load().processor.clone_inner();
955        managed.pipeline.store(Arc::new(PipelineAssembly::new(
956            SyncBoxProcessor::new(old_processor),
957            lifecycle,
958        )));
959        Ok(())
960    }
961}
962
963#[cfg(test)]
964#[path = "route_controller_tests.rs"]
965mod tests;