Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10
11const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::sync::CancellationToken;
15use tower::{Layer, Service, ServiceExt};
16use tracing::{error, info, warn};
17
18use camel_api::UnitOfWorkConfig;
19use camel_api::aggregator::AggregatorConfig;
20use camel_api::error_handler::ErrorHandlerConfig;
21use camel_api::metrics::MetricsCollector;
22use camel_api::{
23    BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
24    NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeCommand,
25    RuntimeHandle,
26};
27use camel_auth::TokenAuthenticator;
28use camel_component_api::{
29    ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
30};
31use camel_endpoint::parse_uri;
32pub use camel_processor::aggregator::SharedLanguageRegistry;
33use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
34use camel_processor::circuit_breaker::CircuitBreakerLayer;
35use camel_processor::error_handler::ErrorHandlerLayer;
36use camel_processor::security_policy_layer::SecurityPolicyLayer;
37
38use crate::health_registry::HealthCheckRegistry;
39use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
40use crate::lifecycle::adapters::route_compiler::{
41    compose_pipeline, compose_traced_pipeline_with_contracts,
42};
43use crate::lifecycle::application::route_definition::{
44    BuilderStep, RouteDefinition, RouteDefinitionInfo,
45};
46use crate::shared::components::domain::Registry;
47use crate::shared::observability::domain::{DetailLevel, TracerConfig};
48use arc_swap::ArcSwap;
49use camel_bean::BeanRegistry;
50
51/// Notification sent when a route crashes.
52#[derive(Debug, Clone)]
53pub struct CrashNotification {
54    /// The ID of the crashed route.
55    pub route_id: String,
56    /// The error that caused the crash.
57    pub error: String,
58}
59
60/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
61///
62/// # Safety
63///
64/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
65/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
66///
67/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
68///    (creates a new boxed service from the inner clone).
69/// 2. The clone is owned by the calling thread and never shared.
70/// 3. ArcSwap guarantees we only get & references (no &mut).
71///
72/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
73/// clone() does not mutate shared state and each thread gets an independent copy.
74pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
75unsafe impl Sync for SyncBoxProcessor {}
76
77type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
78
79#[cfg(test)]
80type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
81
82#[cfg(test)]
83static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
84    std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
85
86#[cfg(test)]
87fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
88    *START_ROUTE_EVENT_HOOK
89        .lock()
90        .expect("start route event hook lock") = hook;
91}
92
93#[cfg(test)]
94fn emit_start_route_event(event: &'static str) {
95    if let Some(hook) = START_ROUTE_EVENT_HOOK
96        .lock()
97        .expect("start route event hook lock")
98        .as_ref()
99    {
100        hook(event);
101    }
102}
103
104/// Internal state for a managed route.
105pub(super) struct AggregateSplitInfo {
106    pub(super) pre_pipeline: SharedPipeline,
107    pub(super) agg_config: AggregatorConfig,
108    pub(super) post_pipeline: SharedPipeline,
109}
110
111pub(super) struct ManagedRoute {
112    /// The route definition metadata (for introspection).
113    pub(super) definition: RouteDefinitionInfo,
114    /// Source endpoint URI.
115    pub(super) from_uri: String,
116    /// Resolved processor pipeline (wrapped for atomic swap).
117    pub(super) pipeline: SharedPipeline,
118    /// Concurrency model override (if any).
119    pub(super) concurrency: Option<ConcurrencyModel>,
120    /// Handle for the consumer task (if running).
121    pub(super) consumer_handle: Option<JoinHandle<()>>,
122    /// Handle for the pipeline task (if running).
123    pub(super) pipeline_handle: Option<JoinHandle<()>>,
124    /// Cancellation token for stopping the consumer task.
125    /// This allows independent control of the consumer lifecycle (for suspend/resume).
126    pub(super) consumer_cancel_token: CancellationToken,
127    /// Cancellation token for stopping the pipeline task.
128    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
129    pub(super) pipeline_cancel_token: CancellationToken,
130    /// Channel sender for sending exchanges to the pipeline.
131    /// Stored to allow resuming a suspended route without recreating the channel.
132    pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
133    /// In-flight exchange counter. `None` when UoW is not configured for this route.
134    pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
135    pub(super) aggregate_split: Option<AggregateSplitInfo>,
136    pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
137    /// Stored security policy config for use when starting/resuming the consumer.
138    pub(super) security_policy: Option<camel_api::security_policy::SecurityPolicyConfig>,
139    /// Stored security authenticator for use when starting/resuming the consumer.
140    pub(super) security_authenticator: Option<Arc<dyn TokenAuthenticator>>,
141}
142
143pub(crate) struct PreparedRoute {
144    pub(crate) route_id: String,
145    pub(super) managed: ManagedRoute,
146}
147
148pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
149    handle.as_ref().is_some_and(|h| !h.is_finished())
150}
151
152fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
153    match (
154        handle_is_running(&managed.consumer_handle),
155        handle_is_running(&managed.pipeline_handle),
156    ) {
157        (true, true) => "Started",
158        (false, true) => "Suspended",
159        (true, false) => "Stopping",
160        (false, false) => "Stopped",
161    }
162}
163
164fn find_top_level_aggregate_requiring_split(
165    steps: &[BuilderStep],
166) -> Option<(usize, AggregatorConfig)> {
167    for (i, step) in steps.iter().enumerate() {
168        if let BuilderStep::Aggregate { config } = step {
169            if has_timeout_condition(&config.completion) || config.force_completion_on_stop {
170                return Some((i, config.clone()));
171            }
172            break;
173        }
174    }
175    None
176}
177
178pub(crate) struct ControllerComponentContext {
179    registry: Arc<std::sync::Mutex<Registry>>,
180    languages: SharedLanguageRegistry,
181    metrics: Arc<dyn MetricsCollector>,
182    platform_service: Arc<dyn PlatformService>,
183    health_registry: Arc<HealthCheckRegistry>,
184    route_id: Option<String>,
185}
186
187impl ControllerComponentContext {
188    pub(crate) fn new(
189        registry: Arc<std::sync::Mutex<Registry>>,
190        languages: SharedLanguageRegistry,
191        metrics: Arc<dyn MetricsCollector>,
192        platform_service: Arc<dyn PlatformService>,
193        health_registry: Arc<HealthCheckRegistry>,
194        route_id: Option<String>,
195    ) -> Self {
196        Self {
197            registry,
198            languages,
199            metrics,
200            platform_service,
201            health_registry,
202            route_id,
203        }
204    }
205}
206
207impl ComponentContext for ControllerComponentContext {
208    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
209        self.registry.lock().ok()?.get(scheme)
210    }
211
212    fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
213        self.languages.lock().ok()?.get(name).cloned()
214    }
215
216    fn metrics(&self) -> Arc<dyn MetricsCollector> {
217        Arc::clone(&self.metrics)
218    }
219
220    fn platform_service(&self) -> Arc<dyn PlatformService> {
221        Arc::clone(&self.platform_service)
222    }
223
224    fn register_route_health_check(
225        &self,
226        route_id: &str,
227        check: Arc<dyn camel_api::AsyncHealthCheck>,
228    ) {
229        self.health_registry.register_for_route(route_id, check);
230    }
231
232    fn unregister_route_health_check(&self, route_id: &str) {
233        self.health_registry.unregister_for_route(route_id);
234    }
235
236    fn route_id(&self) -> Option<&str> {
237        self.route_id.as_deref()
238    }
239}
240
241fn is_pending(ex: &Exchange) -> bool {
242    ex.property("CamelAggregatorPending")
243        .and_then(|v| v.as_bool())
244        .unwrap_or(false)
245}
246
247/// Wait for a pipeline service to be ready with circuit breaker backoff.
248///
249/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
250/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
251/// cancellation checks. It returns `Ok(())` when the service is ready, or
252/// `Err(e)` if cancellation occurred or a fatal error was encountered.
253async fn ready_with_backoff(
254    pipeline: &mut BoxProcessor,
255    cancel: &CancellationToken,
256) -> Result<(), CamelError> {
257    loop {
258        match pipeline.ready().await {
259            Ok(_) => return Ok(()),
260            Err(CamelError::CircuitOpen(ref msg)) => {
261                warn!("Circuit open, backing off: {msg}");
262                tokio::select! {
263                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
264                        continue;
265                    }
266                    _ = cancel.cancelled() => {
267                        // Shutting down — don't retry.
268                        return Err(CamelError::CircuitOpen(msg.clone()));
269                    }
270                }
271            }
272            Err(e) => {
273                error!("Pipeline not ready: {e}");
274                return Err(e);
275            }
276        }
277    }
278}
279
280fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
281    let stamp = std::time::SystemTime::now()
282        .duration_since(std::time::UNIX_EPOCH)
283        .unwrap_or_default()
284        .as_nanos();
285    RuntimeCommand::FailRoute {
286        route_id: route_id.to_string(),
287        error: error.to_string(),
288        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
289        causation_id: None,
290    }
291}
292
293pub(super) async fn publish_runtime_failure(
294    runtime: Option<Weak<dyn RuntimeHandle>>,
295    route_id: &str,
296    error: &str,
297) {
298    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
299        return;
300    };
301    let command = runtime_failure_command(route_id, error);
302    if let Err(runtime_error) = runtime.execute(command).await {
303        warn!(
304            route_id = %route_id,
305            error = %runtime_error,
306            "failed to synchronize route crash with runtime projection"
307        );
308    }
309}
310
311/// Default implementation of [`RouteController`].
312///
313/// Manages route lifecycle with support for:
314/// - Starting/stopping individual routes
315/// - Suspending and resuming routes
316/// - Auto-startup with startup ordering
317/// - Graceful shutdown
318pub struct DefaultRouteController {
319    /// Routes indexed by route ID.
320    routes: HashMap<String, ManagedRoute>,
321    /// Reference to the component registry for resolving endpoints.
322    registry: Arc<std::sync::Mutex<Registry>>,
323    /// Shared language registry for resolving declarative language expressions.
324    languages: SharedLanguageRegistry,
325    /// Bean registry for bean method invocation.
326    beans: Arc<std::sync::Mutex<BeanRegistry>>,
327    /// Runtime handle injected into ProducerContext for command/query operations.
328    runtime: Option<Weak<dyn RuntimeHandle>>,
329    /// Optional global error handler applied to all routes without a per-route handler.
330    global_error_handler: Option<ErrorHandlerConfig>,
331    /// Optional crash notifier for supervision.
332    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
333    /// Whether tracing is enabled for route pipelines.
334    tracing_enabled: bool,
335    /// Detail level for tracing when enabled.
336    tracer_detail_level: DetailLevel,
337    /// Metrics collector for tracing processor.
338    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
339    platform_service: Arc<dyn PlatformService>,
340    function_invoker: Option<Arc<dyn FunctionInvoker>>,
341    health_registry: Option<Arc<HealthCheckRegistry>>,
342}
343
344impl DefaultRouteController {
345    fn health_registry(&self) -> Arc<HealthCheckRegistry> {
346        self.health_registry.clone().unwrap_or_else(|| {
347            warn!("health_registry not configured — creating isolated fallback");
348            Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
349        })
350    }
351
352    /// Create a new `DefaultRouteController` with the given registry.
353    pub fn new(
354        registry: Arc<std::sync::Mutex<Registry>>,
355        platform_service: Arc<dyn PlatformService>,
356    ) -> Self {
357        Self::with_beans_and_platform_service(
358            registry,
359            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
360            platform_service,
361        )
362    }
363
364    /// Create a new `DefaultRouteController` with shared bean registry.
365    pub fn with_beans(
366        registry: Arc<std::sync::Mutex<Registry>>,
367        beans: Arc<std::sync::Mutex<BeanRegistry>>,
368    ) -> Self {
369        Self::with_beans_and_platform_service(
370            registry,
371            beans,
372            Arc::new(NoopPlatformService::default()),
373        )
374    }
375
376    fn with_beans_and_platform_service(
377        registry: Arc<std::sync::Mutex<Registry>>,
378        beans: Arc<std::sync::Mutex<BeanRegistry>>,
379        platform_service: Arc<dyn PlatformService>,
380    ) -> Self {
381        Self {
382            routes: HashMap::new(),
383            registry,
384            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
385            beans,
386            runtime: None,
387            global_error_handler: None,
388            crash_notifier: None,
389            tracing_enabled: false,
390            tracer_detail_level: DetailLevel::Minimal,
391            tracer_metrics: None,
392            platform_service,
393            function_invoker: None,
394            health_registry: None,
395        }
396    }
397
398    /// Create a new `DefaultRouteController` with shared language registry.
399    pub fn with_languages(
400        registry: Arc<std::sync::Mutex<Registry>>,
401        languages: SharedLanguageRegistry,
402        platform_service: Arc<dyn PlatformService>,
403    ) -> Self {
404        Self {
405            routes: HashMap::new(),
406            registry,
407            languages,
408            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
409            runtime: None,
410            global_error_handler: None,
411            crash_notifier: None,
412            tracing_enabled: false,
413            tracer_detail_level: DetailLevel::Minimal,
414            tracer_metrics: None,
415            platform_service,
416            function_invoker: None,
417            health_registry: None,
418        }
419    }
420
421    pub fn with_languages_and_beans(
422        registry: Arc<std::sync::Mutex<Registry>>,
423        languages: SharedLanguageRegistry,
424        platform_service: Arc<dyn PlatformService>,
425        beans: Arc<std::sync::Mutex<BeanRegistry>>,
426    ) -> Self {
427        Self {
428            routes: HashMap::new(),
429            registry,
430            languages,
431            beans,
432            runtime: None,
433            global_error_handler: None,
434            crash_notifier: None,
435            tracing_enabled: false,
436            tracer_detail_level: DetailLevel::Minimal,
437            tracer_metrics: None,
438            platform_service,
439            function_invoker: None,
440            health_registry: None,
441        }
442    }
443
444    pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
445        self.function_invoker = Some(function_invoker);
446        self
447    }
448
449    pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
450        self.health_registry = Some(registry);
451    }
452
453    pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
454        self.function_invoker = Some(invoker);
455    }
456
457    /// Set runtime handle for ProducerContext creation.
458    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
459        self.runtime = Some(Arc::downgrade(&runtime));
460    }
461
462    /// Set the crash notifier for supervision.
463    ///
464    /// When set, the controller will send a [`CrashNotification`] whenever
465    /// a consumer crashes.
466    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
467        self.crash_notifier = Some(tx);
468    }
469
470    /// Set a global error handler applied to all routes without a per-route handler.
471    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
472        self.global_error_handler = Some(config);
473    }
474
475    /// Configure tracing for this route controller.
476    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
477        self.tracing_enabled = config.enabled;
478        self.tracer_detail_level = config.detail_level.clone();
479        self.tracer_metrics = config.metrics_collector.clone();
480    }
481
482    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
483        let mut producer_ctx = ProducerContext::new();
484        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
485            producer_ctx = producer_ctx.with_runtime(runtime);
486        }
487        Ok(producer_ctx)
488    }
489
490    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
491    fn resolve_error_handler(
492        &self,
493        config: ErrorHandlerConfig,
494        producer_ctx: &ProducerContext,
495        component_ctx: &dyn ComponentContext,
496    ) -> Result<ErrorHandlerLayer, CamelError> {
497        // Resolve DLC URI → producer.
498        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
499            let parsed = parse_uri(uri)?;
500            let component = component_ctx
501                .resolve_component(&parsed.scheme)
502                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
503            let endpoint = component.create_endpoint(uri, component_ctx)?;
504            Some(endpoint.create_producer(producer_ctx)?)
505        } else {
506            None
507        };
508
509        // Resolve per-policy `handled_by` URIs.
510        let mut resolved_policies = Vec::new();
511        for policy in config.policies {
512            let handler_producer = if let Some(ref uri) = policy.handled_by {
513                let parsed = parse_uri(uri)?;
514                let component = component_ctx
515                    .resolve_component(&parsed.scheme)
516                    .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
517                let endpoint = component.create_endpoint(uri, component_ctx)?;
518                Some(endpoint.create_producer(producer_ctx)?)
519            } else {
520                None
521            };
522            resolved_policies.push((policy, handler_producer));
523        }
524
525        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
526    }
527
528    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
529    /// Returns `Err` if any hook URI cannot be resolved.
530    fn resolve_uow_layer(
531        &self,
532        config: &UnitOfWorkConfig,
533        producer_ctx: &ProducerContext,
534        component_ctx: &dyn ComponentContext,
535        counter: Option<Arc<AtomicU64>>,
536    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
537        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
538            let parsed = parse_uri(uri)?;
539            let component = component_ctx
540                .resolve_component(&parsed.scheme)
541                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
542            let endpoint = component.create_endpoint(uri, component_ctx)?;
543            endpoint.create_producer(producer_ctx).map_err(|e| {
544                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
545            })
546        };
547
548        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
549        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
550
551        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
552        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
553        Ok((layer, counter))
554    }
555
556    /// Resolve BuilderSteps into BoxProcessors.
557    pub(crate) fn resolve_steps(
558        &self,
559        steps: Vec<BuilderStep>,
560        producer_ctx: &ProducerContext,
561        registry: &Arc<std::sync::Mutex<Registry>>,
562        route_id: Option<&str>,
563        staging_mode: &super::step_resolution::FunctionStagingMode,
564    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
565        let component_ctx = Arc::new(ControllerComponentContext::new(
566            Arc::clone(registry),
567            Arc::clone(&self.languages),
568            self.tracer_metrics
569                .clone()
570                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
571            Arc::clone(&self.platform_service),
572            self.health_registry(),
573            route_id.map(|s| s.to_string()),
574        ));
575
576        super::step_resolution::resolve_steps(
577            steps,
578            producer_ctx,
579            registry,
580            &self.languages,
581            &self.beans,
582            self.function_invoker.clone(),
583            component_ctx,
584            route_id,
585            staging_mode,
586        )
587    }
588
589    /// Add a route definition to the controller.
590    ///
591    /// Steps are resolved immediately using the registry.
592    ///
593    /// # Errors
594    ///
595    /// Returns an error if:
596    /// - A route with the same ID already exists
597    /// - Step resolution fails
598    pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
599        let route_id = definition.route_id().to_string();
600
601        if self.routes.contains_key(&route_id) {
602            return Err(CamelError::RouteError(format!(
603                "Route '{}' already exists",
604                route_id
605            )));
606        }
607
608        info!(route_id = %route_id, "Adding route to controller");
609
610        let prepared = match self.build_managed_route(
611            definition,
612            &super::step_resolution::FunctionStagingMode::DirectAdd,
613        ) {
614            Ok(prepared) => prepared,
615            Err(err) => {
616                self.discard_function_staging();
617                return Err(err);
618            }
619        };
620
621        if let Some(invoker) = &self.function_invoker
622            && let Err(err) = invoker.commit_staged().await
623        {
624            invoker.discard_staging(0);
625            return Err(CamelError::Config(err.to_string()));
626        }
627
628        self.routes
629            .insert(prepared.route_id.clone(), prepared.managed);
630
631        Ok(())
632    }
633
634    fn build_managed_route(
635        &self,
636        definition: RouteDefinition,
637        staging_mode: &super::step_resolution::FunctionStagingMode,
638    ) -> Result<PreparedRoute, CamelError> {
639        let route_id = definition.route_id().to_string();
640
641        let definition_info = definition.to_info();
642        let RouteDefinition {
643            from_uri,
644            steps,
645            error_handler,
646            circuit_breaker,
647            security_policy,
648            security_authenticator,
649            unit_of_work,
650            concurrency,
651            ..
652        } = definition;
653
654        let producer_ctx = self.build_producer_context()?;
655
656        let mut aggregate_split: Option<AggregateSplitInfo> = None;
657        let processors_with_contracts = match find_top_level_aggregate_requiring_split(&steps) {
658            Some((idx, agg_config)) => {
659                let mut pre_steps = steps;
660                let mut rest = pre_steps.split_off(idx);
661                let _agg_step = rest.remove(0);
662                let post_steps = rest;
663
664                let pre_pairs = self.resolve_steps(
665                    pre_steps,
666                    &producer_ctx,
667                    &self.registry,
668                    Some(&route_id),
669                    staging_mode,
670                )?;
671                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
672                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
673                    compose_pipeline(pre_procs),
674                )));
675
676                let post_pairs = self.resolve_steps(
677                    post_steps,
678                    &producer_ctx,
679                    &self.registry,
680                    Some(&route_id),
681                    staging_mode,
682                )?;
683                let post_procs: Vec<BoxProcessor> =
684                    post_pairs.into_iter().map(|(p, _)| p).collect();
685                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
686                    compose_pipeline(post_procs),
687                )));
688
689                aggregate_split = Some(AggregateSplitInfo {
690                    pre_pipeline,
691                    agg_config,
692                    post_pipeline,
693                });
694
695                vec![]
696            }
697            None => self.resolve_steps(
698                steps,
699                &producer_ctx,
700                &self.registry,
701                Some(&route_id),
702                staging_mode,
703            )?,
704        };
705        let route_id_for_tracing = route_id.clone();
706        let mut pipeline = if processors_with_contracts.is_empty() {
707            BoxProcessor::new(IdentityProcessor)
708        } else {
709            compose_traced_pipeline_with_contracts(
710                processors_with_contracts,
711                &route_id_for_tracing,
712                self.tracing_enabled,
713                self.tracer_detail_level.clone(),
714                self.tracer_metrics.clone(),
715            )
716        };
717
718        if let Some(cb_config) = circuit_breaker {
719            let cb_layer = CircuitBreakerLayer::new(cb_config);
720            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
721        }
722
723        if let Some(sp_config) = security_policy.clone() {
724            let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
725            pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
726        }
727
728        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
729
730        if let Some(config) = eh_config {
731            let component_ctx = ControllerComponentContext::new(
732                Arc::clone(&self.registry),
733                Arc::clone(&self.languages),
734                self.tracer_metrics
735                    .clone()
736                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
737                Arc::clone(&self.platform_service),
738                self.health_registry(),
739                Some(route_id.clone()),
740            );
741            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
742            pipeline = BoxProcessor::new(layer.layer(pipeline));
743        }
744
745        let uow_counter = if let Some(uow_config) = &unit_of_work {
746            let component_ctx = ControllerComponentContext::new(
747                Arc::clone(&self.registry),
748                Arc::clone(&self.languages),
749                self.tracer_metrics
750                    .clone()
751                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
752                Arc::clone(&self.platform_service),
753                self.health_registry(),
754                Some(route_id.clone()),
755            );
756            let (uow_layer, counter) =
757                self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
758            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
759            Some(counter)
760        } else {
761            None
762        };
763
764        Ok(PreparedRoute {
765            route_id,
766            managed: ManagedRoute {
767                definition: definition_info,
768                from_uri,
769                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
770                concurrency,
771                consumer_handle: None,
772                pipeline_handle: None,
773                consumer_cancel_token: CancellationToken::new(),
774                pipeline_cancel_token: CancellationToken::new(),
775                channel_sender: None,
776                in_flight: uow_counter,
777                aggregate_split,
778                agg_service: None,
779                security_policy,
780                security_authenticator,
781            },
782        })
783    }
784
785    pub(crate) fn insert_prepared_route(
786        &mut self,
787        prepared: PreparedRoute,
788    ) -> Result<(), CamelError> {
789        if self.routes.contains_key(&prepared.route_id) {
790            return Err(CamelError::RouteError(format!(
791                "Route '{}' already exists",
792                prepared.route_id
793            )));
794        }
795        self.routes
796            .insert(prepared.route_id.clone(), prepared.managed);
797        Ok(())
798    }
799
800    pub async fn add_route_with_generation(
801        &mut self,
802        definition: RouteDefinition,
803        generation: u64,
804    ) -> Result<(), CamelError> {
805        let route_id = definition.route_id().to_string();
806
807        if self.routes.contains_key(&route_id) {
808            return Err(CamelError::RouteError(format!(
809                "Route '{}' already exists",
810                route_id
811            )));
812        }
813
814        info!(route_id = %route_id, generation, "Adding route to controller with generation");
815
816        let prepared = self.build_managed_route(
817            definition,
818            &super::step_resolution::FunctionStagingMode::HotReload { generation },
819        )?;
820
821        self.routes
822            .insert(prepared.route_id.clone(), prepared.managed);
823
824        Ok(())
825    }
826
827    pub(crate) fn prepare_route_definition_with_generation(
828        &self,
829        definition: RouteDefinition,
830        generation: u64,
831    ) -> Result<PreparedRoute, CamelError> {
832        self.build_managed_route(
833            definition,
834            &super::step_resolution::FunctionStagingMode::HotReload { generation },
835        )
836    }
837
838    pub async fn remove_route_preserving_functions(
839        &mut self,
840        route_id: &str,
841    ) -> Result<(), CamelError> {
842        let managed = self.routes.get(route_id).ok_or_else(|| {
843            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
844        })?;
845        if handle_is_running(&managed.consumer_handle)
846            || handle_is_running(&managed.pipeline_handle)
847        {
848            return Err(CamelError::RouteError(format!(
849                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
850                route_id,
851                inferred_lifecycle_label(managed)
852            )));
853        }
854        self.routes.remove(route_id);
855        if let Some(reg) = &self.health_registry {
856            reg.unregister_for_route(route_id);
857        }
858        info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
859        Ok(())
860    }
861
862    pub fn compile_route_definition(
863        &self,
864        def: RouteDefinition,
865    ) -> Result<BoxProcessor, CamelError> {
866        let route_id = def.route_id().to_string();
867
868        let producer_ctx = self.build_producer_context()?;
869
870        let processors_with_contracts = self.resolve_steps(
871            def.steps,
872            &producer_ctx,
873            &self.registry,
874            Some(&route_id),
875            &super::step_resolution::FunctionStagingMode::DryCompile,
876        )?;
877        let mut pipeline = compose_traced_pipeline_with_contracts(
878            processors_with_contracts,
879            &route_id,
880            self.tracing_enabled,
881            self.tracer_detail_level.clone(),
882            self.tracer_metrics.clone(),
883        );
884
885        if let Some(cb_config) = def.circuit_breaker {
886            let cb_layer = CircuitBreakerLayer::new(cb_config);
887            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
888        }
889
890        if let Some(sp_config) = def.security_policy {
891            let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
892            pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
893        }
894
895        let eh_config = def
896            .error_handler
897            .clone()
898            .or_else(|| self.global_error_handler.clone());
899        if let Some(config) = eh_config {
900            let component_ctx = ControllerComponentContext::new(
901                Arc::clone(&self.registry),
902                Arc::clone(&self.languages),
903                self.tracer_metrics
904                    .clone()
905                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
906                Arc::clone(&self.platform_service),
907                self.health_registry(),
908                Some(route_id.clone()),
909            );
910            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
911            pipeline = BoxProcessor::new(layer.layer(pipeline));
912        }
913
914        // Apply UoW layer outermost
915        if let Some(uow_config) = &def.unit_of_work {
916            let existing_counter = self
917                .routes
918                .get(&route_id)
919                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
920
921            let component_ctx = ControllerComponentContext::new(
922                Arc::clone(&self.registry),
923                Arc::clone(&self.languages),
924                self.tracer_metrics
925                    .clone()
926                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
927                Arc::clone(&self.platform_service),
928                self.health_registry(),
929                Some(route_id.clone()),
930            );
931
932            let (uow_layer, _counter) = self.resolve_uow_layer(
933                uow_config,
934                &producer_ctx,
935                &component_ctx,
936                existing_counter,
937            )?;
938
939            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
940        }
941
942        Ok(pipeline)
943    }
944
945    pub fn compile_route_definition_with_generation(
946        &self,
947        def: RouteDefinition,
948        generation: u64,
949    ) -> Result<BoxProcessor, CamelError> {
950        let route_id = def.route_id().to_string();
951
952        let producer_ctx = self.build_producer_context()?;
953
954        let processors_with_contracts = self.resolve_steps(
955            def.steps,
956            &producer_ctx,
957            &self.registry,
958            Some(&route_id),
959            &super::step_resolution::FunctionStagingMode::HotReload { generation },
960        )?;
961        let mut pipeline = compose_traced_pipeline_with_contracts(
962            processors_with_contracts,
963            &route_id,
964            self.tracing_enabled,
965            self.tracer_detail_level.clone(),
966            self.tracer_metrics.clone(),
967        );
968
969        if let Some(cb_config) = def.circuit_breaker {
970            let cb_layer = CircuitBreakerLayer::new(cb_config);
971            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
972        }
973
974        if let Some(sp_config) = def.security_policy {
975            let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
976            pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
977        }
978
979        let eh_config = def
980            .error_handler
981            .clone()
982            .or_else(|| self.global_error_handler.clone());
983        if let Some(config) = eh_config {
984            let component_ctx = ControllerComponentContext::new(
985                Arc::clone(&self.registry),
986                Arc::clone(&self.languages),
987                self.tracer_metrics
988                    .clone()
989                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
990                Arc::clone(&self.platform_service),
991                self.health_registry(),
992                Some(route_id.clone()),
993            );
994            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
995            pipeline = BoxProcessor::new(layer.layer(pipeline));
996        }
997
998        if let Some(uow_config) = &def.unit_of_work {
999            let existing_counter = self
1000                .routes
1001                .get(&route_id)
1002                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1003
1004            let component_ctx = ControllerComponentContext::new(
1005                Arc::clone(&self.registry),
1006                Arc::clone(&self.languages),
1007                self.tracer_metrics
1008                    .clone()
1009                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1010                Arc::clone(&self.platform_service),
1011                self.health_registry(),
1012                Some(route_id.clone()),
1013            );
1014
1015            let (uow_layer, _counter) = self.resolve_uow_layer(
1016                uow_config,
1017                &producer_ctx,
1018                &component_ctx,
1019                existing_counter,
1020            )?;
1021
1022            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1023        }
1024
1025        Ok(pipeline)
1026    }
1027
1028    /// Remove a route from the controller map.
1029    ///
1030    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
1031    /// Returns an error if the route is still running or does not exist.
1032    /// Does not cancel any running tasks — call `stop_route` first.
1033    pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1034        let managed = self.routes.get(route_id).ok_or_else(|| {
1035            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1036        })?;
1037        if handle_is_running(&managed.consumer_handle)
1038            || handle_is_running(&managed.pipeline_handle)
1039        {
1040            return Err(CamelError::RouteError(format!(
1041                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1042                route_id,
1043                inferred_lifecycle_label(managed)
1044            )));
1045        }
1046        if let Some(invoker) = &self.function_invoker {
1047            for (id, rid) in self.collect_function_refs(route_id) {
1048                if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
1049                    warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
1050                }
1051            }
1052        }
1053        self.routes.remove(route_id);
1054        if let Some(reg) = &self.health_registry {
1055            reg.unregister_for_route(route_id);
1056        }
1057        info!(route_id = %route_id, "Route removed from controller");
1058        Ok(())
1059    }
1060
1061    fn collect_function_refs(
1062        &self,
1063        route_id: &str,
1064    ) -> Vec<(camel_api::FunctionId, Option<String>)> {
1065        self.function_invoker
1066            .as_ref()
1067            .map(|invoker| invoker.function_refs_for_route(route_id))
1068            .unwrap_or_default()
1069    }
1070
1071    fn discard_function_staging(&self) {
1072        if let Some(invoker) = &self.function_invoker {
1073            invoker.discard_staging(0);
1074        }
1075    }
1076
1077    /// Returns the number of routes in the controller.
1078    pub fn route_count(&self) -> usize {
1079        self.routes.len()
1080    }
1081
1082    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1083        self.routes.get(route_id).map(|r| {
1084            r.in_flight
1085                .as_ref()
1086                .map_or(0, |c| c.load(Ordering::Relaxed))
1087        })
1088    }
1089
1090    /// Returns `true` if a route with the given ID exists.
1091    pub fn route_exists(&self, route_id: &str) -> bool {
1092        self.routes.contains_key(route_id)
1093    }
1094
1095    /// Returns all route IDs.
1096    pub fn route_ids(&self) -> Vec<String> {
1097        self.routes.keys().cloned().collect()
1098    }
1099
1100    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
1101        self.routes
1102            .get(route_id)
1103            .and_then(|m| m.definition.source_hash())
1104    }
1105
1106    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
1107    pub fn auto_startup_route_ids(&self) -> Vec<String> {
1108        let mut pairs: Vec<(String, i32)> = self
1109            .routes
1110            .iter()
1111            .filter(|(_, managed)| managed.definition.auto_startup())
1112            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1113            .collect();
1114        pairs.sort_by_key(|(_, order)| *order);
1115        pairs.into_iter().map(|(id, _)| id).collect()
1116    }
1117
1118    /// Returns route IDs sorted by shutdown order (startup order descending).
1119    pub fn shutdown_route_ids(&self) -> Vec<String> {
1120        let mut pairs: Vec<(String, i32)> = self
1121            .routes
1122            .iter()
1123            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1124            .collect();
1125        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1126        pairs.into_iter().map(|(id, _)| id).collect()
1127    }
1128
1129    /// Atomically swap the pipeline of a route.
1130    ///
1131    /// In-flight requests finish with the old pipeline (kept alive by Arc).
1132    /// New requests immediately use the new pipeline.
1133    pub fn swap_pipeline(
1134        &self,
1135        route_id: &str,
1136        new_pipeline: BoxProcessor,
1137    ) -> Result<(), CamelError> {
1138        let managed = self
1139            .routes
1140            .get(route_id)
1141            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1142
1143        if managed.aggregate_split.is_some() {
1144            tracing::warn!(
1145                route_id = %route_id,
1146                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1147            );
1148        }
1149
1150        managed
1151            .pipeline
1152            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1153        info!(route_id = %route_id, "Pipeline swapped atomically");
1154        Ok(())
1155    }
1156
1157    /// Returns the from_uri of a route, if it exists.
1158    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1159        self.routes.get(route_id).map(|r| r.from_uri.clone())
1160    }
1161
1162    /// Get a clone of the current pipeline for a route.
1163    ///
1164    /// This is useful for testing and introspection.
1165    /// Returns `None` if the route doesn't exist.
1166    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1167        self.routes
1168            .get(route_id)
1169            .map(|r| r.pipeline.load().0.clone())
1170    }
1171
1172    /// Internal stop implementation that can set custom status.
1173    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1174        super::consumer_management::stop_route_internal(
1175            &mut self.routes,
1176            route_id,
1177            DEFAULT_SHUTDOWN_TIMEOUT,
1178        )
1179        .await
1180    }
1181
1182    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1183        self.start_route(route_id).await
1184    }
1185
1186    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1187        self.stop_route(route_id).await
1188    }
1189}
1190
1191#[async_trait::async_trait]
1192impl RouteController for DefaultRouteController {
1193    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1194        // Check if route exists and can be started.
1195        {
1196            let managed = self
1197                .routes
1198                .get_mut(route_id)
1199                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1200
1201            let consumer_running = handle_is_running(&managed.consumer_handle);
1202            let pipeline_running = handle_is_running(&managed.pipeline_handle);
1203            if consumer_running && pipeline_running {
1204                return Ok(());
1205            }
1206            if !consumer_running && pipeline_running {
1207                return Err(CamelError::RouteError(format!(
1208                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1209                    route_id
1210                )));
1211            }
1212            if consumer_running && !pipeline_running {
1213                return Err(CamelError::RouteError(format!(
1214                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1215                    route_id
1216                )));
1217            }
1218        }
1219
1220        info!(route_id = %route_id, "Starting route");
1221
1222        // Get the resolved route info
1223        let (from_uri, pipeline, concurrency) = {
1224            let managed = self
1225                .routes
1226                .get(route_id)
1227                .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1228            (
1229                managed.from_uri.clone(),
1230                Arc::clone(&managed.pipeline),
1231                managed.concurrency.clone(),
1232            )
1233        };
1234
1235        // Clone crash notifier for consumer task
1236        let crash_notifier = self.crash_notifier.clone();
1237        let runtime_for_consumer = self.runtime.clone();
1238
1239        let (mut consumer, consumer_concurrency) =
1240            super::consumer_management::create_route_consumer(
1241                &self.registry,
1242                &from_uri,
1243                &ControllerComponentContext::new(
1244                    Arc::clone(&self.registry),
1245                    Arc::clone(&self.languages),
1246                    self.tracer_metrics
1247                        .clone()
1248                        .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1249                    Arc::clone(&self.platform_service),
1250                    self.health_registry(),
1251                    Some(route_id.to_string()),
1252                ),
1253            )?;
1254
1255        // Resolve effective concurrency: route override > consumer default
1256        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1257
1258        // Get the managed route for mutation
1259        let managed = self
1260            .routes
1261            .get_mut(route_id)
1262            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1263
1264        // Wire security context before spawning consumer
1265        if let (Some(sp_config), Some(authenticator)) = (
1266            managed.security_policy.as_ref(),
1267            managed.security_authenticator.as_ref(),
1268        ) {
1269            use camel_component_api::SecurityContext;
1270            let sec_ctx =
1271                SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
1272            consumer.set_security_context(sec_ctx);
1273        }
1274
1275        // Create channel for consumer to send exchanges
1276        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1277        // Create child tokens for independent lifecycle control
1278        let consumer_cancel = managed.consumer_cancel_token.child_token();
1279        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1280        // Clone sender for storage (to reuse on resume)
1281        let tx_for_storage = tx.clone();
1282        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1283
1284        // --- Aggregator v2: check for aggregate route with timeout ---
1285        let managed = self
1286            .routes
1287            .get_mut(route_id)
1288            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1289
1290        if let Some(split) = managed.aggregate_split.as_ref() {
1291            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1292
1293            let route_cancel_clone = pipeline_cancel.clone();
1294            let svc = AggregatorService::new(
1295                split.agg_config.clone(),
1296                late_tx,
1297                Arc::clone(&self.languages),
1298                route_cancel_clone,
1299            );
1300            let agg = Arc::new(std::sync::Mutex::new(svc));
1301
1302            let pipeline_cancel_for_monitor = pipeline_cancel.clone();
1303            let agg_for_monitor = Arc::clone(&agg);
1304
1305            managed.agg_service = Some(Arc::clone(&agg));
1306
1307            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1308            let pre_pipeline = Arc::clone(&split.pre_pipeline);
1309            let post_pipeline = Arc::clone(&split.post_pipeline);
1310
1311            // Spawn biased select forward loop
1312            let pipeline_handle = tokio::spawn(async move {
1313                loop {
1314                    tokio::select! {
1315                        biased;
1316
1317                        late_ex = async {
1318                            let mut rx = late_rx.lock().await;
1319                            rx.recv().await
1320                        } => {
1321                            match late_ex {
1322                                Some(ex) => {
1323                                    let pipe = post_pipeline.load();
1324                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
1325                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
1326                                    }
1327                                }
1328                                None => return,
1329                            }
1330                        }
1331
1332                        envelope_opt = rx.recv() => {
1333                            match envelope_opt {
1334                                Some(envelope) => {
1335                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
1336                                    let pre_pipe = pre_pipeline.load();
1337                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1338                                        Ok(ex) => ex,
1339                                        Err(e) => {
1340                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1341                                            continue;
1342                                        }
1343                                    };
1344
1345                                    let ex = {
1346                                        let cloned_svc = agg
1347                                            .lock()
1348                                            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
1349                                            .clone();
1350                                        cloned_svc.oneshot(ex).await
1351                                    };
1352
1353                                    match ex {
1354                                        Ok(ex) => {
1355                                            if !is_pending(&ex) {
1356                                                let post_pipe = post_pipeline.load();
1357                                                let out = post_pipe.0.clone().oneshot(ex).await;
1358                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
1359                                            } else if let Some(tx) = reply_tx {
1360                                                let _ = tx.send(Ok(ex));
1361                                            }
1362                                        }
1363                                        Err(e) => {
1364                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1365                                        }
1366                                    }
1367                                }
1368                                None => return,
1369                            }
1370                        }
1371
1372                        _ = pipeline_cancel.cancelled() => {
1373                            {
1374                                let guard = agg
1375                                    .lock()
1376                                    .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
1377                                guard.force_complete_all();
1378                            }
1379                            let mut rx_guard = late_rx.lock().await;
1380                            while let Ok(late_ex) = rx_guard.try_recv() {
1381                                let pipe = post_pipeline.load();
1382                                let _ = pipe.0.clone().oneshot(late_ex).await;
1383                            }
1384                            break;
1385                        }
1386                    }
1387                }
1388            });
1389            #[cfg(test)]
1390            emit_start_route_event("pipeline_spawned");
1391
1392            // Start consumer after pipeline loop is spawned to avoid startup races
1393            // where consumers emit exchanges before the route pipeline begins polling.
1394            let consumer_handle = super::consumer_management::spawn_consumer_task(
1395                route_id.to_string(),
1396                consumer,
1397                consumer_ctx,
1398                crash_notifier,
1399                runtime_for_consumer,
1400                false,
1401            );
1402
1403            // Extend the stored consumer handle through aggregate force-completion.
1404            // While this monitor drains pending buckets, handle_is_running still reports
1405            // the Route as running because forced exchanges may still be in post-pipeline.
1406            let force_on_stop = agg_for_monitor
1407                .lock()
1408                .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
1409                .config()
1410                .force_completion_on_stop;
1411            let consumer_handle = tokio::spawn(async move {
1412                let _ = consumer_handle.await;
1413                if !pipeline_cancel_for_monitor.is_cancelled() {
1414                    let guard = agg_for_monitor
1415                        .lock()
1416                        .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
1417                    guard.force_complete_all();
1418                    drop(guard);
1419                    if force_on_stop {
1420                        pipeline_cancel_for_monitor.cancel();
1421                    }
1422                }
1423            });
1424            #[cfg(test)]
1425            emit_start_route_event("consumer_spawned");
1426
1427            let managed = self
1428                .routes
1429                .get_mut(route_id)
1430                .expect("invariant: route must exist"); // allow-unwrap
1431            managed.consumer_handle = Some(consumer_handle);
1432            managed.pipeline_handle = Some(pipeline_handle);
1433            managed.channel_sender = Some(tx_for_storage);
1434
1435            info!(route_id = %route_id, "Route started (aggregate with timeout)");
1436            return Ok(());
1437        }
1438        // --- End aggregator v2 branch ---
1439
1440        // Spawn pipeline task with its own cancellation token
1441        let pipeline_handle = match effective_concurrency {
1442            ConcurrencyModel::Sequential => {
1443                tokio::spawn(async move {
1444                    loop {
1445                        // Use select! to exit promptly on cancellation even when idle
1446                        let envelope = tokio::select! {
1447                            envelope = rx.recv() => match envelope {
1448                                Some(e) => e,
1449                                None => return, // Channel closed
1450                            },
1451                            _ = pipeline_cancel.cancelled() => {
1452                                // Cancellation requested - exit gracefully
1453                                return;
1454                            }
1455                        };
1456                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1457
1458                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1459                        let mut pipeline = pipeline.load().0.clone();
1460
1461                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1462                            if let Some(tx) = reply_tx {
1463                                let _ = tx.send(Err(e));
1464                            }
1465                            return;
1466                        }
1467
1468                        let result = pipeline.call(exchange).await;
1469                        if let Some(tx) = reply_tx {
1470                            let _ = tx.send(result);
1471                        } else if let Err(ref e) = result
1472                            && !matches!(e, CamelError::Stopped)
1473                        {
1474                            error!("Pipeline error: {e}");
1475                        }
1476                    }
1477                })
1478            }
1479            ConcurrencyModel::Concurrent { max } => {
1480                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1481                tokio::spawn(async move {
1482                    loop {
1483                        // Use select! to exit promptly on cancellation even when idle
1484                        let envelope = tokio::select! {
1485                            envelope = rx.recv() => match envelope {
1486                                Some(e) => e,
1487                                None => return, // Channel closed
1488                            },
1489                            _ = pipeline_cancel.cancelled() => {
1490                                // Cancellation requested - exit gracefully
1491                                return;
1492                            }
1493                        };
1494                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1495                        let pipe_ref = Arc::clone(&pipeline);
1496                        let sem = sem.clone();
1497                        let cancel = pipeline_cancel.clone();
1498                        tokio::spawn(async move {
1499                            // Acquire semaphore permit if bounded
1500                            let _permit = match &sem {
1501                                Some(s) => Some(s.acquire().await.expect("semaphore closed")), // allow-unwrap
1502                                None => None,
1503                            };
1504
1505                            // Load current pipeline from ArcSwap
1506                            let mut pipe = pipe_ref.load().0.clone();
1507
1508                            // Wait for service ready with circuit breaker backoff
1509                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1510                                if let Some(tx) = reply_tx {
1511                                    let _ = tx.send(Err(e));
1512                                }
1513                                return;
1514                            }
1515
1516                            let result = pipe.call(exchange).await;
1517                            if let Some(tx) = reply_tx {
1518                                let _ = tx.send(result);
1519                            } else if let Err(ref e) = result
1520                                && !matches!(e, CamelError::Stopped)
1521                            {
1522                                error!("Pipeline error: {e}");
1523                            }
1524                        });
1525                    }
1526                })
1527            }
1528        };
1529        #[cfg(test)]
1530        emit_start_route_event("pipeline_spawned");
1531
1532        // Start consumer after pipeline task is spawned to minimize the chance of
1533        // fire-and-forget events being produced before the pipeline loop is active.
1534        let consumer_handle = super::consumer_management::spawn_consumer_task(
1535            route_id.to_string(),
1536            consumer,
1537            consumer_ctx,
1538            crash_notifier,
1539            runtime_for_consumer,
1540            false,
1541        );
1542        #[cfg(test)]
1543        emit_start_route_event("consumer_spawned");
1544
1545        // Store handles and update status
1546        let managed = self
1547            .routes
1548            .get_mut(route_id)
1549            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1550        managed.consumer_handle = Some(consumer_handle);
1551        managed.pipeline_handle = Some(pipeline_handle);
1552        managed.channel_sender = Some(tx_for_storage);
1553
1554        info!(route_id = %route_id, "Route started");
1555        Ok(())
1556    }
1557
1558    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1559        self.stop_route_internal(route_id).await?;
1560        if let Some(reg) = &self.health_registry {
1561            reg.unregister_for_route(route_id);
1562        }
1563        Ok(())
1564    }
1565
1566    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1567        self.stop_route(route_id).await?;
1568        tokio::time::sleep(Duration::from_millis(100)).await;
1569        self.start_route(route_id).await
1570    }
1571
1572    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1573        // Check route exists and state.
1574        let managed = self
1575            .routes
1576            .get_mut(route_id)
1577            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1578
1579        let consumer_running = handle_is_running(&managed.consumer_handle);
1580        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1581
1582        // Can only suspend from active started state.
1583        if !consumer_running || !pipeline_running {
1584            return Err(CamelError::RouteError(format!(
1585                "Cannot suspend route '{}' with execution lifecycle {}",
1586                route_id,
1587                inferred_lifecycle_label(managed)
1588            )));
1589        }
1590
1591        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1592
1593        // Cancel consumer token only (keep pipeline running)
1594        let managed = self
1595            .routes
1596            .get_mut(route_id)
1597            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1598        managed.consumer_cancel_token.cancel();
1599
1600        // Take and join consumer handle
1601        let managed = self
1602            .routes
1603            .get_mut(route_id)
1604            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1605        let consumer_handle = managed.consumer_handle.take();
1606
1607        // Wait for consumer task to complete with timeout
1608        let timeout_result = tokio::time::timeout(DEFAULT_SHUTDOWN_TIMEOUT, async {
1609            if let Some(handle) = consumer_handle {
1610                let _ = handle.await;
1611            }
1612        })
1613        .await;
1614
1615        if timeout_result.is_err() {
1616            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1617        }
1618
1619        // Get the managed route again (can't hold across await)
1620        let managed = self
1621            .routes
1622            .get_mut(route_id)
1623            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1624
1625        // Create fresh cancellation token for consumer (for resume)
1626        managed.consumer_cancel_token = CancellationToken::new();
1627
1628        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1629        Ok(())
1630    }
1631
1632    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1633        // Check route exists and is Suspended-equivalent execution state.
1634        let managed = self
1635            .routes
1636            .get(route_id)
1637            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1638
1639        let consumer_running = handle_is_running(&managed.consumer_handle);
1640        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1641        if consumer_running || !pipeline_running {
1642            return Err(CamelError::RouteError(format!(
1643                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1644                route_id,
1645                inferred_lifecycle_label(managed)
1646            )));
1647        }
1648
1649        // Get the stored channel sender (must exist for a suspended route)
1650        let sender = managed.channel_sender.clone().ok_or_else(|| {
1651            CamelError::RouteError("Suspended route has no channel sender".into())
1652        })?;
1653
1654        // Get from_uri and concurrency for creating new consumer
1655        let from_uri = managed.from_uri.clone();
1656
1657        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1658
1659        let (mut consumer, _) = super::consumer_management::create_route_consumer(
1660            &self.registry,
1661            &from_uri,
1662            &ControllerComponentContext::new(
1663                Arc::clone(&self.registry),
1664                Arc::clone(&self.languages),
1665                self.tracer_metrics
1666                    .clone()
1667                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1668                Arc::clone(&self.platform_service),
1669                self.health_registry(),
1670                Some(route_id.to_string()),
1671            ),
1672        )?;
1673
1674        // Wire security context before spawning consumer
1675        let managed = self
1676            .routes
1677            .get(route_id)
1678            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1679        if let (Some(sp_config), Some(authenticator)) = (
1680            managed.security_policy.as_ref(),
1681            managed.security_authenticator.as_ref(),
1682        ) {
1683            use camel_component_api::SecurityContext;
1684            let sec_ctx =
1685                SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
1686            consumer.set_security_context(sec_ctx);
1687        }
1688
1689        // Get the managed route for mutation
1690        let managed = self
1691            .routes
1692            .get_mut(route_id)
1693            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1694
1695        // Create child token for consumer lifecycle
1696        let consumer_cancel = managed.consumer_cancel_token.child_token();
1697
1698        let crash_notifier = self.crash_notifier.clone();
1699        let runtime_for_consumer = self.runtime.clone();
1700
1701        // Create ConsumerContext with the stored sender
1702        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1703
1704        // Spawn consumer task
1705        let consumer_handle = super::consumer_management::spawn_consumer_task(
1706            route_id.to_string(),
1707            consumer,
1708            consumer_ctx,
1709            crash_notifier,
1710            runtime_for_consumer,
1711            true,
1712        );
1713
1714        // Store consumer handle and update status
1715        let managed = self
1716            .routes
1717            .get_mut(route_id)
1718            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1719        managed.consumer_handle = Some(consumer_handle);
1720
1721        info!(route_id = %route_id, "Route resumed");
1722        Ok(())
1723    }
1724
1725    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1726        // Only start routes where auto_startup() == true
1727        // Sort by startup_order() ascending before starting
1728        let route_ids: Vec<String> = {
1729            let mut pairs: Vec<_> = self
1730                .routes
1731                .iter()
1732                .filter(|(_, r)| r.definition.auto_startup())
1733                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1734                .collect();
1735            pairs.sort_by_key(|(_, order)| *order);
1736            pairs.into_iter().map(|(id, _)| id).collect()
1737        };
1738
1739        info!("Starting {} auto-startup routes", route_ids.len());
1740
1741        // Collect errors but continue starting remaining routes
1742        let mut errors: Vec<String> = Vec::new();
1743        for route_id in route_ids {
1744            if let Err(e) = self.start_route(&route_id).await {
1745                errors.push(format!("Route '{}': {}", route_id, e));
1746            }
1747        }
1748
1749        if !errors.is_empty() {
1750            return Err(CamelError::RouteError(format!(
1751                "Failed to start routes: {}",
1752                errors.join(", ")
1753            )));
1754        }
1755
1756        info!("All auto-startup routes started");
1757        Ok(())
1758    }
1759
1760    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1761        // Sort by startup_order descending (reverse order)
1762        let route_ids: Vec<String> = {
1763            let mut pairs: Vec<_> = self
1764                .routes
1765                .iter()
1766                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1767                .collect();
1768            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1769            pairs.into_iter().map(|(id, _)| id).collect()
1770        };
1771
1772        info!("Stopping {} routes", route_ids.len());
1773
1774        for route_id in route_ids {
1775            let _ = self.stop_route(&route_id).await;
1776        }
1777
1778        info!("All routes stopped");
1779        Ok(())
1780    }
1781}
1782
1783#[cfg(test)]
1784#[path = "route_controller_tests.rs"]
1785mod tests;