Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10
11const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::sync::CancellationToken;
15use tower::{Layer, Service, ServiceExt};
16use tracing::{error, info, warn};
17
18use camel_api::UnitOfWorkConfig;
19use camel_api::aggregator::AggregatorConfig;
20use camel_api::error_handler::ErrorHandlerConfig;
21use camel_api::metrics::MetricsCollector;
22use camel_api::{
23    BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
24    NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeCommand,
25    RuntimeHandle,
26};
27use camel_auth::TokenAuthenticator;
28use camel_component_api::{
29    ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
30};
31use camel_endpoint::parse_uri;
32pub use camel_processor::aggregator::SharedLanguageRegistry;
33use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
34use camel_processor::circuit_breaker::CircuitBreakerLayer;
35use camel_processor::error_handler::ErrorHandlerLayer;
36use camel_processor::security_policy_layer::SecurityPolicyLayer;
37
38use crate::health_registry::HealthCheckRegistry;
39use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
40use crate::lifecycle::adapters::route_compiler::{
41    compose_pipeline, compose_traced_pipeline_with_contracts,
42};
43use crate::lifecycle::application::route_definition::{
44    BuilderStep, RouteDefinition, RouteDefinitionInfo,
45};
46use crate::shared::components::domain::Registry;
47use crate::shared::observability::domain::{DetailLevel, TracerConfig};
48use arc_swap::ArcSwap;
49use camel_bean::BeanRegistry;
50
51/// Notification sent when a route crashes.
52#[derive(Debug, Clone)]
53pub struct CrashNotification {
54    /// The ID of the crashed route.
55    pub route_id: String,
56    /// The error that caused the crash.
57    pub error: String,
58}
59
60/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
61///
62/// # Safety
63///
64/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
65/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
66///
67/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
68///    (creates a new boxed service from the inner clone).
69/// 2. The clone is owned by the calling thread and never shared.
70/// 3. ArcSwap guarantees we only get & references (no &mut).
71///
72/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
73/// clone() does not mutate shared state and each thread gets an independent copy.
74pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
75unsafe impl Sync for SyncBoxProcessor {}
76
77type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
78
79#[cfg(test)]
80type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
81
82#[cfg(test)]
83static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
84    std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
85
86#[cfg(test)]
87fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
88    *START_ROUTE_EVENT_HOOK
89        .lock()
90        .expect("start route event hook lock") = hook;
91}
92
93#[cfg(test)]
94fn emit_start_route_event(event: &'static str) {
95    if let Some(hook) = START_ROUTE_EVENT_HOOK
96        .lock()
97        .expect("start route event hook lock")
98        .as_ref()
99    {
100        hook(event);
101    }
102}
103
104/// Internal state for a managed route.
105pub(super) struct AggregateSplitInfo {
106    pub(super) pre_pipeline: SharedPipeline,
107    pub(super) agg_config: AggregatorConfig,
108    pub(super) post_pipeline: SharedPipeline,
109}
110
111pub(super) struct ManagedRoute {
112    /// The route definition metadata (for introspection).
113    pub(super) definition: RouteDefinitionInfo,
114    /// Source endpoint URI.
115    pub(super) from_uri: String,
116    /// Resolved processor pipeline (wrapped for atomic swap).
117    pub(super) pipeline: SharedPipeline,
118    /// Concurrency model override (if any).
119    pub(super) concurrency: Option<ConcurrencyModel>,
120    /// Handle for the consumer task (if running).
121    pub(super) consumer_handle: Option<JoinHandle<()>>,
122    /// Handle for the pipeline task (if running).
123    pub(super) pipeline_handle: Option<JoinHandle<()>>,
124    /// Cancellation token for stopping the consumer task.
125    /// This allows independent control of the consumer lifecycle (for suspend/resume).
126    pub(super) consumer_cancel_token: CancellationToken,
127    /// Cancellation token for stopping the pipeline task.
128    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
129    pub(super) pipeline_cancel_token: CancellationToken,
130    /// Channel sender for sending exchanges to the pipeline.
131    /// Stored to allow resuming a suspended route without recreating the channel.
132    pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
133    /// In-flight exchange counter. `None` when UoW is not configured for this route.
134    pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
135    pub(super) aggregate_split: Option<AggregateSplitInfo>,
136    pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
137    /// Stored security policy config for use when starting/resuming the consumer.
138    pub(super) security_policy: Option<camel_api::security_policy::SecurityPolicyConfig>,
139    /// Stored security authenticator for use when starting/resuming the consumer.
140    pub(super) security_authenticator: Option<Arc<dyn TokenAuthenticator>>,
141}
142
143pub(crate) struct PreparedRoute {
144    pub(crate) route_id: String,
145    pub(super) managed: ManagedRoute,
146}
147
148pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
149    handle.as_ref().is_some_and(|h| !h.is_finished())
150}
151
152fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
153    match (
154        handle_is_running(&managed.consumer_handle),
155        handle_is_running(&managed.pipeline_handle),
156    ) {
157        (true, true) => "Started",
158        (false, true) => "Suspended",
159        (true, false) => "Stopping",
160        (false, false) => "Stopped",
161    }
162}
163
164fn find_top_level_aggregate_requiring_split(
165    steps: &[BuilderStep],
166) -> Option<(usize, AggregatorConfig)> {
167    for (i, step) in steps.iter().enumerate() {
168        if let BuilderStep::Aggregate { config } = step {
169            if has_timeout_condition(&config.completion) || config.force_completion_on_stop {
170                return Some((i, config.clone()));
171            }
172            break;
173        }
174    }
175    None
176}
177
178pub(crate) struct ControllerComponentContext {
179    registry: Arc<std::sync::Mutex<Registry>>,
180    languages: SharedLanguageRegistry,
181    metrics: Arc<dyn MetricsCollector>,
182    platform_service: Arc<dyn PlatformService>,
183    health_registry: Arc<HealthCheckRegistry>,
184    route_id: Option<String>,
185}
186
187impl ControllerComponentContext {
188    pub(crate) fn new(
189        registry: Arc<std::sync::Mutex<Registry>>,
190        languages: SharedLanguageRegistry,
191        metrics: Arc<dyn MetricsCollector>,
192        platform_service: Arc<dyn PlatformService>,
193        health_registry: Arc<HealthCheckRegistry>,
194        route_id: Option<String>,
195    ) -> Self {
196        Self {
197            registry,
198            languages,
199            metrics,
200            platform_service,
201            health_registry,
202            route_id,
203        }
204    }
205}
206
207impl ComponentContext for ControllerComponentContext {
208    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
209        self.registry.lock().ok()?.get(scheme)
210    }
211
212    fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
213        self.languages.lock().ok()?.get(name).cloned()
214    }
215
216    fn metrics(&self) -> Arc<dyn MetricsCollector> {
217        Arc::clone(&self.metrics)
218    }
219
220    fn platform_service(&self) -> Arc<dyn PlatformService> {
221        Arc::clone(&self.platform_service)
222    }
223
224    fn register_route_health_check(
225        &self,
226        route_id: &str,
227        check: Arc<dyn camel_api::AsyncHealthCheck>,
228    ) {
229        self.health_registry.register_for_route(route_id, check);
230    }
231
232    fn unregister_route_health_check(&self, route_id: &str) {
233        self.health_registry.unregister_for_route(route_id);
234    }
235
236    fn route_id(&self) -> Option<&str> {
237        self.route_id.as_deref()
238    }
239}
240
241fn is_pending(ex: &Exchange) -> bool {
242    ex.property("CamelAggregatorPending")
243        .and_then(|v| v.as_bool())
244        .unwrap_or(false)
245}
246
247/// Wait for a pipeline service to be ready with circuit breaker backoff.
248///
249/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
250/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
251/// cancellation checks. It returns `Ok(())` when the service is ready, or
252/// `Err(e)` if cancellation occurred or a fatal error was encountered.
253async fn ready_with_backoff(
254    pipeline: &mut BoxProcessor,
255    cancel: &CancellationToken,
256) -> Result<(), CamelError> {
257    loop {
258        match pipeline.ready().await {
259            Ok(_) => return Ok(()),
260            Err(CamelError::CircuitOpen(ref msg)) => {
261                warn!("Circuit open, backing off: {msg}");
262                tokio::select! {
263                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
264                        continue;
265                    }
266                    _ = cancel.cancelled() => {
267                        // Shutting down — don't retry.
268                        return Err(CamelError::CircuitOpen(msg.clone()));
269                    }
270                }
271            }
272            Err(e) => {
273                // log-policy: system-broken
274                error!("Pipeline not ready: {e}");
275                return Err(e);
276            }
277        }
278    }
279}
280
281fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
282    let stamp = std::time::SystemTime::now()
283        .duration_since(std::time::UNIX_EPOCH)
284        .unwrap_or_default()
285        .as_nanos();
286    RuntimeCommand::FailRoute {
287        route_id: route_id.to_string(),
288        error: error.to_string(),
289        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
290        causation_id: None,
291    }
292}
293
294pub(super) async fn publish_runtime_failure(
295    runtime: Option<Weak<dyn RuntimeHandle>>,
296    route_id: &str,
297    error: &str,
298) {
299    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
300        return;
301    };
302    let command = runtime_failure_command(route_id, error);
303    if let Err(runtime_error) = runtime.execute(command).await {
304        warn!(
305            route_id = %route_id,
306            error = %runtime_error,
307            "failed to synchronize route crash with runtime projection"
308        );
309    }
310}
311
312/// Default implementation of [`RouteController`].
313///
314/// Manages route lifecycle with support for:
315/// - Starting/stopping individual routes
316/// - Suspending and resuming routes
317/// - Auto-startup with startup ordering
318/// - Graceful shutdown
319pub struct DefaultRouteController {
320    /// Routes indexed by route ID.
321    routes: HashMap<String, ManagedRoute>,
322    /// Reference to the component registry for resolving endpoints.
323    registry: Arc<std::sync::Mutex<Registry>>,
324    /// Shared language registry for resolving declarative language expressions.
325    languages: SharedLanguageRegistry,
326    /// Bean registry for bean method invocation.
327    beans: Arc<std::sync::Mutex<BeanRegistry>>,
328    /// Runtime handle injected into ProducerContext for command/query operations.
329    runtime: Option<Weak<dyn RuntimeHandle>>,
330    /// Optional global error handler applied to all routes without a per-route handler.
331    global_error_handler: Option<ErrorHandlerConfig>,
332    /// Optional crash notifier for supervision.
333    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
334    /// Whether tracing is enabled for route pipelines.
335    tracing_enabled: bool,
336    /// Detail level for tracing when enabled.
337    tracer_detail_level: DetailLevel,
338    /// Metrics collector for tracing processor.
339    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
340    platform_service: Arc<dyn PlatformService>,
341    function_invoker: Option<Arc<dyn FunctionInvoker>>,
342    health_registry: Option<Arc<HealthCheckRegistry>>,
343}
344
345impl DefaultRouteController {
346    fn health_registry(&self) -> Arc<HealthCheckRegistry> {
347        self.health_registry.clone().unwrap_or_else(|| {
348            warn!("health_registry not configured — creating isolated fallback");
349            Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
350        })
351    }
352
353    /// Create a new `DefaultRouteController` with the given registry.
354    pub fn new(
355        registry: Arc<std::sync::Mutex<Registry>>,
356        platform_service: Arc<dyn PlatformService>,
357    ) -> Self {
358        Self::with_beans_and_platform_service(
359            registry,
360            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
361            platform_service,
362        )
363    }
364
365    /// Create a new `DefaultRouteController` with shared bean registry.
366    pub fn with_beans(
367        registry: Arc<std::sync::Mutex<Registry>>,
368        beans: Arc<std::sync::Mutex<BeanRegistry>>,
369    ) -> Self {
370        Self::with_beans_and_platform_service(
371            registry,
372            beans,
373            Arc::new(NoopPlatformService::default()),
374        )
375    }
376
377    fn with_beans_and_platform_service(
378        registry: Arc<std::sync::Mutex<Registry>>,
379        beans: Arc<std::sync::Mutex<BeanRegistry>>,
380        platform_service: Arc<dyn PlatformService>,
381    ) -> Self {
382        Self {
383            routes: HashMap::new(),
384            registry,
385            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
386            beans,
387            runtime: None,
388            global_error_handler: None,
389            crash_notifier: None,
390            tracing_enabled: false,
391            tracer_detail_level: DetailLevel::Minimal,
392            tracer_metrics: None,
393            platform_service,
394            function_invoker: None,
395            health_registry: None,
396        }
397    }
398
399    /// Create a new `DefaultRouteController` with shared language registry.
400    pub fn with_languages(
401        registry: Arc<std::sync::Mutex<Registry>>,
402        languages: SharedLanguageRegistry,
403        platform_service: Arc<dyn PlatformService>,
404    ) -> Self {
405        Self {
406            routes: HashMap::new(),
407            registry,
408            languages,
409            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
410            runtime: None,
411            global_error_handler: None,
412            crash_notifier: None,
413            tracing_enabled: false,
414            tracer_detail_level: DetailLevel::Minimal,
415            tracer_metrics: None,
416            platform_service,
417            function_invoker: None,
418            health_registry: None,
419        }
420    }
421
422    pub fn with_languages_and_beans(
423        registry: Arc<std::sync::Mutex<Registry>>,
424        languages: SharedLanguageRegistry,
425        platform_service: Arc<dyn PlatformService>,
426        beans: Arc<std::sync::Mutex<BeanRegistry>>,
427    ) -> Self {
428        Self {
429            routes: HashMap::new(),
430            registry,
431            languages,
432            beans,
433            runtime: None,
434            global_error_handler: None,
435            crash_notifier: None,
436            tracing_enabled: false,
437            tracer_detail_level: DetailLevel::Minimal,
438            tracer_metrics: None,
439            platform_service,
440            function_invoker: None,
441            health_registry: None,
442        }
443    }
444
445    pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
446        self.function_invoker = Some(function_invoker);
447        self
448    }
449
450    pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
451        self.health_registry = Some(registry);
452    }
453
454    pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
455        self.function_invoker = Some(invoker);
456    }
457
458    /// Set runtime handle for ProducerContext creation.
459    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
460        self.runtime = Some(Arc::downgrade(&runtime));
461    }
462
463    /// Set the crash notifier for supervision.
464    ///
465    /// When set, the controller will send a [`CrashNotification`] whenever
466    /// a consumer crashes.
467    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
468        self.crash_notifier = Some(tx);
469    }
470
471    /// Set a global error handler applied to all routes without a per-route handler.
472    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
473        self.global_error_handler = Some(config);
474    }
475
476    /// Configure tracing for this route controller.
477    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
478        self.tracing_enabled = config.enabled;
479        self.tracer_detail_level = config.detail_level.clone();
480        self.tracer_metrics = config.metrics_collector.clone();
481    }
482
483    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
484        let mut producer_ctx = ProducerContext::new();
485        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
486            producer_ctx = producer_ctx.with_runtime(runtime);
487        }
488        Ok(producer_ctx)
489    }
490
491    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
492    fn resolve_error_handler(
493        &self,
494        config: ErrorHandlerConfig,
495        producer_ctx: &ProducerContext,
496        rt: Arc<dyn camel_component_api::RuntimeObservability>,
497        component_ctx: &dyn ComponentContext,
498    ) -> Result<ErrorHandlerLayer, CamelError> {
499        // Resolve DLC URI → producer.
500        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
501            let parsed = parse_uri(uri)?;
502            let component = component_ctx
503                .resolve_component(&parsed.scheme)
504                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
505            let endpoint = component.create_endpoint(uri, component_ctx)?;
506            Some(endpoint.create_producer(Arc::clone(&rt), producer_ctx)?)
507        } else {
508            None
509        };
510
511        // Resolve per-policy `handled_by` URIs.
512        let mut resolved_policies = Vec::new();
513        for policy in config.policies {
514            let handler_producer = if let Some(ref uri) = policy.handled_by {
515                let parsed = parse_uri(uri)?;
516                let component = component_ctx
517                    .resolve_component(&parsed.scheme)
518                    .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
519                let endpoint = component.create_endpoint(uri, component_ctx)?;
520                Some(endpoint.create_producer(Arc::clone(&rt), producer_ctx)?)
521            } else {
522                None
523            };
524            resolved_policies.push((policy, handler_producer));
525        }
526
527        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
528    }
529
530    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
531    /// Returns `Err` if any hook URI cannot be resolved.
532    fn resolve_uow_layer(
533        &self,
534        config: &UnitOfWorkConfig,
535        producer_ctx: &ProducerContext,
536        rt: Arc<dyn camel_component_api::RuntimeObservability>,
537        component_ctx: &dyn ComponentContext,
538        counter: Option<Arc<AtomicU64>>,
539    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
540        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
541            let parsed = parse_uri(uri)?;
542            let component = component_ctx
543                .resolve_component(&parsed.scheme)
544                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
545            let endpoint = component.create_endpoint(uri, component_ctx)?;
546            endpoint
547                .create_producer(Arc::clone(&rt), producer_ctx)
548                .map_err(|e| {
549                    CamelError::RouteError(format!(
550                        "UoW hook URI '{uri}' could not be resolved: {e}"
551                    ))
552                })
553        };
554
555        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
556        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
557
558        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
559        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
560        Ok((layer, counter))
561    }
562
563    /// Resolve BuilderSteps into BoxProcessors.
564    pub(crate) fn resolve_steps(
565        &self,
566        steps: Vec<BuilderStep>,
567        producer_ctx: &ProducerContext,
568        registry: &Arc<std::sync::Mutex<Registry>>,
569        route_id: Option<&str>,
570        staging_mode: &super::step_resolution::FunctionStagingMode,
571    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
572        let component_ctx = Arc::new(ControllerComponentContext::new(
573            Arc::clone(registry),
574            Arc::clone(&self.languages),
575            self.tracer_metrics
576                .clone()
577                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
578            Arc::clone(&self.platform_service),
579            self.health_registry(),
580            route_id.map(|s| s.to_string()),
581        ));
582        let rt: Arc<dyn camel_component_api::RuntimeObservability> =
583            Arc::clone(&component_ctx) as Arc<_>;
584
585        super::step_resolution::resolve_steps(
586            steps,
587            producer_ctx,
588            rt,
589            registry,
590            &self.languages,
591            &self.beans,
592            self.function_invoker.clone(),
593            component_ctx,
594            route_id,
595            staging_mode,
596        )
597    }
598
599    /// Add a route definition to the controller.
600    ///
601    /// Steps are resolved immediately using the registry.
602    ///
603    /// # Errors
604    ///
605    /// Returns an error if:
606    /// - A route with the same ID already exists
607    /// - Step resolution fails
608    pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
609        let route_id = definition.route_id().to_string();
610
611        if self.routes.contains_key(&route_id) {
612            return Err(CamelError::RouteError(format!(
613                "Route '{}' already exists",
614                route_id
615            )));
616        }
617
618        info!(route_id = %route_id, "Adding route to controller");
619
620        let prepared = match self.build_managed_route(
621            definition,
622            &super::step_resolution::FunctionStagingMode::DirectAdd,
623        ) {
624            Ok(prepared) => prepared,
625            Err(err) => {
626                self.discard_function_staging();
627                return Err(err);
628            }
629        };
630
631        if let Some(invoker) = &self.function_invoker
632            && let Err(err) = invoker.commit_staged().await
633        {
634            invoker.discard_staging(0);
635            return Err(CamelError::Config(err.to_string()));
636        }
637
638        self.routes
639            .insert(prepared.route_id.clone(), prepared.managed);
640
641        Ok(())
642    }
643
644    fn build_managed_route(
645        &self,
646        definition: RouteDefinition,
647        staging_mode: &super::step_resolution::FunctionStagingMode,
648    ) -> Result<PreparedRoute, CamelError> {
649        let route_id = definition.route_id().to_string();
650
651        let definition_info = definition.to_info();
652        let RouteDefinition {
653            from_uri,
654            steps,
655            error_handler,
656            circuit_breaker,
657            security_policy,
658            security_authenticator,
659            unit_of_work,
660            concurrency,
661            ..
662        } = definition;
663
664        let producer_ctx = self.build_producer_context()?;
665
666        let mut aggregate_split: Option<AggregateSplitInfo> = None;
667        let processors_with_contracts = match find_top_level_aggregate_requiring_split(&steps) {
668            Some((idx, agg_config)) => {
669                let mut pre_steps = steps;
670                let mut rest = pre_steps.split_off(idx);
671                let _agg_step = rest.remove(0);
672                let post_steps = rest;
673
674                let pre_pairs = self.resolve_steps(
675                    pre_steps,
676                    &producer_ctx,
677                    &self.registry,
678                    Some(&route_id),
679                    staging_mode,
680                )?;
681                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
682                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
683                    compose_pipeline(pre_procs),
684                )));
685
686                let post_pairs = self.resolve_steps(
687                    post_steps,
688                    &producer_ctx,
689                    &self.registry,
690                    Some(&route_id),
691                    staging_mode,
692                )?;
693                let post_procs: Vec<BoxProcessor> =
694                    post_pairs.into_iter().map(|(p, _)| p).collect();
695                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
696                    compose_pipeline(post_procs),
697                )));
698
699                aggregate_split = Some(AggregateSplitInfo {
700                    pre_pipeline,
701                    agg_config,
702                    post_pipeline,
703                });
704
705                vec![]
706            }
707            None => self.resolve_steps(
708                steps,
709                &producer_ctx,
710                &self.registry,
711                Some(&route_id),
712                staging_mode,
713            )?,
714        };
715        let route_id_for_tracing = route_id.clone();
716        let mut pipeline = if processors_with_contracts.is_empty() {
717            BoxProcessor::new(IdentityProcessor)
718        } else {
719            compose_traced_pipeline_with_contracts(
720                processors_with_contracts,
721                &route_id_for_tracing,
722                self.tracing_enabled,
723                self.tracer_detail_level.clone(),
724                self.tracer_metrics.clone(),
725            )
726        };
727
728        if let Some(cb_config) = circuit_breaker {
729            let cb_layer = CircuitBreakerLayer::new(cb_config);
730            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
731        }
732
733        if let Some(sp_config) = security_policy.clone() {
734            let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
735            pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
736        }
737
738        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
739
740        if let Some(config) = eh_config {
741            let component_ctx = Arc::new(ControllerComponentContext::new(
742                Arc::clone(&self.registry),
743                Arc::clone(&self.languages),
744                self.tracer_metrics
745                    .clone()
746                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
747                Arc::clone(&self.platform_service),
748                self.health_registry(),
749                Some(route_id.clone()),
750            ));
751            let rt: Arc<dyn camel_component_api::RuntimeObservability> =
752                Arc::clone(&component_ctx) as Arc<_>;
753            let layer =
754                self.resolve_error_handler(config, &producer_ctx, rt, component_ctx.as_ref())?;
755            pipeline = BoxProcessor::new(layer.layer(pipeline));
756        }
757
758        let uow_counter = if let Some(uow_config) = &unit_of_work {
759            let component_ctx = Arc::new(ControllerComponentContext::new(
760                Arc::clone(&self.registry),
761                Arc::clone(&self.languages),
762                self.tracer_metrics
763                    .clone()
764                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
765                Arc::clone(&self.platform_service),
766                self.health_registry(),
767                Some(route_id.clone()),
768            ));
769            let rt: Arc<dyn camel_component_api::RuntimeObservability> =
770                Arc::clone(&component_ctx) as Arc<_>;
771            let (uow_layer, counter) = self.resolve_uow_layer(
772                uow_config,
773                &producer_ctx,
774                rt,
775                component_ctx.as_ref(),
776                None,
777            )?;
778            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
779            Some(counter)
780        } else {
781            None
782        };
783
784        Ok(PreparedRoute {
785            route_id,
786            managed: ManagedRoute {
787                definition: definition_info,
788                from_uri,
789                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
790                concurrency,
791                consumer_handle: None,
792                pipeline_handle: None,
793                consumer_cancel_token: CancellationToken::new(),
794                pipeline_cancel_token: CancellationToken::new(),
795                channel_sender: None,
796                in_flight: uow_counter,
797                aggregate_split,
798                agg_service: None,
799                security_policy,
800                security_authenticator,
801            },
802        })
803    }
804
805    pub(crate) fn insert_prepared_route(
806        &mut self,
807        prepared: PreparedRoute,
808    ) -> Result<(), CamelError> {
809        if self.routes.contains_key(&prepared.route_id) {
810            return Err(CamelError::RouteError(format!(
811                "Route '{}' already exists",
812                prepared.route_id
813            )));
814        }
815        self.routes
816            .insert(prepared.route_id.clone(), prepared.managed);
817        Ok(())
818    }
819
820    pub async fn add_route_with_generation(
821        &mut self,
822        definition: RouteDefinition,
823        generation: u64,
824    ) -> Result<(), CamelError> {
825        let route_id = definition.route_id().to_string();
826
827        if self.routes.contains_key(&route_id) {
828            return Err(CamelError::RouteError(format!(
829                "Route '{}' already exists",
830                route_id
831            )));
832        }
833
834        info!(route_id = %route_id, generation, "Adding route to controller with generation");
835
836        let prepared = self.build_managed_route(
837            definition,
838            &super::step_resolution::FunctionStagingMode::HotReload { generation },
839        )?;
840
841        self.routes
842            .insert(prepared.route_id.clone(), prepared.managed);
843
844        Ok(())
845    }
846
847    pub(crate) fn prepare_route_definition_with_generation(
848        &self,
849        definition: RouteDefinition,
850        generation: u64,
851    ) -> Result<PreparedRoute, CamelError> {
852        self.build_managed_route(
853            definition,
854            &super::step_resolution::FunctionStagingMode::HotReload { generation },
855        )
856    }
857
858    pub async fn remove_route_preserving_functions(
859        &mut self,
860        route_id: &str,
861    ) -> Result<(), CamelError> {
862        let managed = self.routes.get(route_id).ok_or_else(|| {
863            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
864        })?;
865        if handle_is_running(&managed.consumer_handle)
866            || handle_is_running(&managed.pipeline_handle)
867        {
868            return Err(CamelError::RouteError(format!(
869                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
870                route_id,
871                inferred_lifecycle_label(managed)
872            )));
873        }
874        self.routes.remove(route_id);
875        if let Some(reg) = &self.health_registry {
876            reg.unregister_for_route(route_id);
877        }
878        info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
879        Ok(())
880    }
881
882    pub fn compile_route_definition(
883        &self,
884        def: RouteDefinition,
885    ) -> Result<BoxProcessor, CamelError> {
886        let route_id = def.route_id().to_string();
887
888        let producer_ctx = self.build_producer_context()?;
889
890        let processors_with_contracts = self.resolve_steps(
891            def.steps,
892            &producer_ctx,
893            &self.registry,
894            Some(&route_id),
895            &super::step_resolution::FunctionStagingMode::DryCompile,
896        )?;
897        let mut pipeline = compose_traced_pipeline_with_contracts(
898            processors_with_contracts,
899            &route_id,
900            self.tracing_enabled,
901            self.tracer_detail_level.clone(),
902            self.tracer_metrics.clone(),
903        );
904
905        if let Some(cb_config) = def.circuit_breaker {
906            let cb_layer = CircuitBreakerLayer::new(cb_config);
907            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
908        }
909
910        if let Some(sp_config) = def.security_policy {
911            let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
912            pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
913        }
914
915        let eh_config = def
916            .error_handler
917            .clone()
918            .or_else(|| self.global_error_handler.clone());
919        if let Some(config) = eh_config {
920            let component_ctx = Arc::new(ControllerComponentContext::new(
921                Arc::clone(&self.registry),
922                Arc::clone(&self.languages),
923                self.tracer_metrics
924                    .clone()
925                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
926                Arc::clone(&self.platform_service),
927                self.health_registry(),
928                Some(route_id.clone()),
929            ));
930            let rt: Arc<dyn camel_component_api::RuntimeObservability> =
931                Arc::clone(&component_ctx) as Arc<_>;
932            let layer =
933                self.resolve_error_handler(config, &producer_ctx, rt, component_ctx.as_ref())?;
934            pipeline = BoxProcessor::new(layer.layer(pipeline));
935        }
936
937        // Apply UoW layer outermost
938        if let Some(uow_config) = &def.unit_of_work {
939            let existing_counter = self
940                .routes
941                .get(&route_id)
942                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
943
944            let component_ctx = Arc::new(ControllerComponentContext::new(
945                Arc::clone(&self.registry),
946                Arc::clone(&self.languages),
947                self.tracer_metrics
948                    .clone()
949                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
950                Arc::clone(&self.platform_service),
951                self.health_registry(),
952                Some(route_id.clone()),
953            ));
954            let rt: Arc<dyn camel_component_api::RuntimeObservability> =
955                Arc::clone(&component_ctx) as Arc<_>;
956
957            let (uow_layer, _counter) = self.resolve_uow_layer(
958                uow_config,
959                &producer_ctx,
960                rt,
961                component_ctx.as_ref(),
962                existing_counter,
963            )?;
964
965            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
966        }
967
968        Ok(pipeline)
969    }
970
971    pub fn compile_route_definition_with_generation(
972        &self,
973        def: RouteDefinition,
974        generation: u64,
975    ) -> Result<BoxProcessor, CamelError> {
976        let route_id = def.route_id().to_string();
977
978        let producer_ctx = self.build_producer_context()?;
979
980        let processors_with_contracts = self.resolve_steps(
981            def.steps,
982            &producer_ctx,
983            &self.registry,
984            Some(&route_id),
985            &super::step_resolution::FunctionStagingMode::HotReload { generation },
986        )?;
987        let mut pipeline = compose_traced_pipeline_with_contracts(
988            processors_with_contracts,
989            &route_id,
990            self.tracing_enabled,
991            self.tracer_detail_level.clone(),
992            self.tracer_metrics.clone(),
993        );
994
995        if let Some(cb_config) = def.circuit_breaker {
996            let cb_layer = CircuitBreakerLayer::new(cb_config);
997            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
998        }
999
1000        if let Some(sp_config) = def.security_policy {
1001            let sp_layer = SecurityPolicyLayer::new(sp_config.policy);
1002            pipeline = BoxProcessor::new(sp_layer.layer(pipeline));
1003        }
1004
1005        let eh_config = def
1006            .error_handler
1007            .clone()
1008            .or_else(|| self.global_error_handler.clone());
1009        if let Some(config) = eh_config {
1010            let component_ctx = Arc::new(ControllerComponentContext::new(
1011                Arc::clone(&self.registry),
1012                Arc::clone(&self.languages),
1013                self.tracer_metrics
1014                    .clone()
1015                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1016                Arc::clone(&self.platform_service),
1017                self.health_registry(),
1018                Some(route_id.clone()),
1019            ));
1020            let rt: Arc<dyn camel_component_api::RuntimeObservability> =
1021                Arc::clone(&component_ctx) as Arc<_>;
1022            let layer =
1023                self.resolve_error_handler(config, &producer_ctx, rt, component_ctx.as_ref())?;
1024            pipeline = BoxProcessor::new(layer.layer(pipeline));
1025        }
1026
1027        if let Some(uow_config) = &def.unit_of_work {
1028            let existing_counter = self
1029                .routes
1030                .get(&route_id)
1031                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1032
1033            let component_ctx = Arc::new(ControllerComponentContext::new(
1034                Arc::clone(&self.registry),
1035                Arc::clone(&self.languages),
1036                self.tracer_metrics
1037                    .clone()
1038                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1039                Arc::clone(&self.platform_service),
1040                self.health_registry(),
1041                Some(route_id.clone()),
1042            ));
1043            let rt: Arc<dyn camel_component_api::RuntimeObservability> =
1044                Arc::clone(&component_ctx) as Arc<_>;
1045
1046            let (uow_layer, _counter) = self.resolve_uow_layer(
1047                uow_config,
1048                &producer_ctx,
1049                rt,
1050                component_ctx.as_ref(),
1051                existing_counter,
1052            )?;
1053
1054            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1055        }
1056
1057        Ok(pipeline)
1058    }
1059
1060    /// Remove a route from the controller map.
1061    ///
1062    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
1063    /// Returns an error if the route is still running or does not exist.
1064    /// Does not cancel any running tasks — call `stop_route` first.
1065    pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1066        let managed = self.routes.get(route_id).ok_or_else(|| {
1067            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1068        })?;
1069        if handle_is_running(&managed.consumer_handle)
1070            || handle_is_running(&managed.pipeline_handle)
1071        {
1072            return Err(CamelError::RouteError(format!(
1073                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1074                route_id,
1075                inferred_lifecycle_label(managed)
1076            )));
1077        }
1078        if let Some(invoker) = &self.function_invoker {
1079            for (id, rid) in self.collect_function_refs(route_id) {
1080                if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
1081                    warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
1082                }
1083            }
1084        }
1085        self.routes.remove(route_id);
1086        if let Some(reg) = &self.health_registry {
1087            reg.unregister_for_route(route_id);
1088        }
1089        info!(route_id = %route_id, "Route removed from controller");
1090        Ok(())
1091    }
1092
1093    fn collect_function_refs(
1094        &self,
1095        route_id: &str,
1096    ) -> Vec<(camel_api::FunctionId, Option<String>)> {
1097        self.function_invoker
1098            .as_ref()
1099            .map(|invoker| invoker.function_refs_for_route(route_id))
1100            .unwrap_or_default()
1101    }
1102
1103    fn discard_function_staging(&self) {
1104        if let Some(invoker) = &self.function_invoker {
1105            invoker.discard_staging(0);
1106        }
1107    }
1108
1109    /// Returns the number of routes in the controller.
1110    pub fn route_count(&self) -> usize {
1111        self.routes.len()
1112    }
1113
1114    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1115        self.routes.get(route_id).map(|r| {
1116            r.in_flight
1117                .as_ref()
1118                .map_or(0, |c| c.load(Ordering::Relaxed))
1119        })
1120    }
1121
1122    /// Returns `true` if a route with the given ID exists.
1123    pub fn route_exists(&self, route_id: &str) -> bool {
1124        self.routes.contains_key(route_id)
1125    }
1126
1127    /// Returns all route IDs.
1128    pub fn route_ids(&self) -> Vec<String> {
1129        self.routes.keys().cloned().collect()
1130    }
1131
1132    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
1133        self.routes
1134            .get(route_id)
1135            .and_then(|m| m.definition.source_hash())
1136    }
1137
1138    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
1139    pub fn auto_startup_route_ids(&self) -> Vec<String> {
1140        let mut pairs: Vec<(String, i32)> = self
1141            .routes
1142            .iter()
1143            .filter(|(_, managed)| managed.definition.auto_startup())
1144            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1145            .collect();
1146        pairs.sort_by_key(|(_, order)| *order);
1147        pairs.into_iter().map(|(id, _)| id).collect()
1148    }
1149
1150    /// Returns route IDs sorted by shutdown order (startup order descending).
1151    pub fn shutdown_route_ids(&self) -> Vec<String> {
1152        let mut pairs: Vec<(String, i32)> = self
1153            .routes
1154            .iter()
1155            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1156            .collect();
1157        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1158        pairs.into_iter().map(|(id, _)| id).collect()
1159    }
1160
1161    /// Atomically swap the pipeline of a route.
1162    ///
1163    /// In-flight requests finish with the old pipeline (kept alive by Arc).
1164    /// New requests immediately use the new pipeline.
1165    pub fn swap_pipeline(
1166        &self,
1167        route_id: &str,
1168        new_pipeline: BoxProcessor,
1169    ) -> Result<(), CamelError> {
1170        let managed = self
1171            .routes
1172            .get(route_id)
1173            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1174
1175        if managed.aggregate_split.is_some() {
1176            tracing::warn!(
1177                route_id = %route_id,
1178                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1179            );
1180        }
1181
1182        managed
1183            .pipeline
1184            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1185        info!(route_id = %route_id, "Pipeline swapped atomically");
1186        Ok(())
1187    }
1188
1189    /// Returns the from_uri of a route, if it exists.
1190    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1191        self.routes.get(route_id).map(|r| r.from_uri.clone())
1192    }
1193
1194    /// Get a clone of the current pipeline for a route.
1195    ///
1196    /// This is useful for testing and introspection.
1197    /// Returns `None` if the route doesn't exist.
1198    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1199        self.routes
1200            .get(route_id)
1201            .map(|r| r.pipeline.load().0.clone())
1202    }
1203
1204    /// Internal stop implementation that can set custom status.
1205    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1206        super::consumer_management::stop_route_internal(
1207            &mut self.routes,
1208            route_id,
1209            DEFAULT_SHUTDOWN_TIMEOUT,
1210        )
1211        .await
1212    }
1213
1214    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1215        self.start_route(route_id).await
1216    }
1217
1218    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1219        self.stop_route(route_id).await
1220    }
1221}
1222
1223#[async_trait::async_trait]
1224impl RouteController for DefaultRouteController {
1225    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1226        // Check if route exists and can be started.
1227        {
1228            let managed = self
1229                .routes
1230                .get_mut(route_id)
1231                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1232
1233            let consumer_running = handle_is_running(&managed.consumer_handle);
1234            let pipeline_running = handle_is_running(&managed.pipeline_handle);
1235            if consumer_running && pipeline_running {
1236                return Ok(());
1237            }
1238            if !consumer_running && pipeline_running {
1239                return Err(CamelError::RouteError(format!(
1240                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1241                    route_id
1242                )));
1243            }
1244            if consumer_running && !pipeline_running {
1245                return Err(CamelError::RouteError(format!(
1246                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1247                    route_id
1248                )));
1249            }
1250        }
1251
1252        info!(route_id = %route_id, "Starting route");
1253
1254        // Get the resolved route info
1255        let (from_uri, pipeline, concurrency) = {
1256            let managed = self
1257                .routes
1258                .get(route_id)
1259                .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1260            (
1261                managed.from_uri.clone(),
1262                Arc::clone(&managed.pipeline),
1263                managed.concurrency.clone(),
1264            )
1265        };
1266
1267        // Clone crash notifier for consumer task
1268        let crash_notifier = self.crash_notifier.clone();
1269        let runtime_for_consumer = self.runtime.clone();
1270
1271        let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
1272            Arc::clone(&self.registry),
1273            Arc::clone(&self.languages),
1274            self.tracer_metrics
1275                .clone()
1276                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1277            Arc::clone(&self.platform_service),
1278            self.health_registry(),
1279            Some(route_id.to_string()),
1280        ));
1281        let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
1282            Arc::clone(&consumer_component_ctx) as Arc<_>;
1283        let (mut consumer, consumer_concurrency) =
1284            super::consumer_management::create_route_consumer(
1285                consumer_rt,
1286                &self.registry,
1287                &from_uri,
1288                consumer_component_ctx.as_ref(),
1289            )?;
1290
1291        // Resolve effective concurrency: route override > consumer default
1292        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1293
1294        // Get the managed route for mutation
1295        let managed = self
1296            .routes
1297            .get_mut(route_id)
1298            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1299
1300        // Wire security context before spawning consumer
1301        if let (Some(sp_config), Some(authenticator)) = (
1302            managed.security_policy.as_ref(),
1303            managed.security_authenticator.as_ref(),
1304        ) {
1305            use camel_component_api::SecurityContext;
1306            let sec_ctx =
1307                SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
1308            consumer.set_security_context(sec_ctx);
1309        }
1310
1311        // Create channel for consumer to send exchanges
1312        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1313        // Create child tokens for independent lifecycle control
1314        let consumer_cancel = managed.consumer_cancel_token.child_token();
1315        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1316        // Clone sender for storage (to reuse on resume)
1317        let tx_for_storage = tx.clone();
1318        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1319
1320        // --- Aggregator v2: check for aggregate route with timeout ---
1321        let managed = self
1322            .routes
1323            .get_mut(route_id)
1324            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1325
1326        if let Some(split) = managed.aggregate_split.as_ref() {
1327            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1328
1329            let route_cancel_clone = pipeline_cancel.clone();
1330            let svc = AggregatorService::new(
1331                split.agg_config.clone(),
1332                late_tx,
1333                Arc::clone(&self.languages),
1334                route_cancel_clone,
1335            );
1336            let agg = Arc::new(std::sync::Mutex::new(svc));
1337
1338            let pipeline_cancel_for_monitor = pipeline_cancel.clone();
1339            let agg_for_monitor = Arc::clone(&agg);
1340
1341            managed.agg_service = Some(Arc::clone(&agg));
1342
1343            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1344            let pre_pipeline = Arc::clone(&split.pre_pipeline);
1345            let post_pipeline = Arc::clone(&split.post_pipeline);
1346
1347            // Spawn biased select forward loop
1348            let pipeline_handle = tokio::spawn(async move {
1349                loop {
1350                    tokio::select! {
1351                        biased;
1352
1353                        late_ex = async {
1354                            let mut rx = late_rx.lock().await;
1355                            rx.recv().await
1356                        } => {
1357                            match late_ex {
1358                                Some(ex) => {
1359                                    let pipe = post_pipeline.load();
1360                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
1361                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
1362                                    }
1363                                }
1364                                None => return,
1365                            }
1366                        }
1367
1368                        envelope_opt = rx.recv() => {
1369                            match envelope_opt {
1370                                Some(envelope) => {
1371                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
1372                                    let pre_pipe = pre_pipeline.load();
1373                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1374                                        Ok(ex) => ex,
1375                                        Err(e) => {
1376                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1377                                            continue;
1378                                        }
1379                                    };
1380
1381                                    let ex = {
1382                                        let cloned_svc = agg
1383                                            .lock()
1384                                            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
1385                                            .clone();
1386                                        cloned_svc.oneshot(ex).await
1387                                    };
1388
1389                                    match ex {
1390                                        Ok(ex) => {
1391                                            if !is_pending(&ex) {
1392                                                let post_pipe = post_pipeline.load();
1393                                                let out = post_pipe.0.clone().oneshot(ex).await;
1394                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
1395                                            } else if let Some(tx) = reply_tx {
1396                                                let _ = tx.send(Ok(ex));
1397                                            }
1398                                        }
1399                                        Err(e) => {
1400                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1401                                        }
1402                                    }
1403                                }
1404                                None => return,
1405                            }
1406                        }
1407
1408                        _ = pipeline_cancel.cancelled() => {
1409                            {
1410                                let guard = agg
1411                                    .lock()
1412                                    .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
1413                                guard.force_complete_all();
1414                            }
1415                            let mut rx_guard = late_rx.lock().await;
1416                            while let Ok(late_ex) = rx_guard.try_recv() {
1417                                let pipe = post_pipeline.load();
1418                                let _ = pipe.0.clone().oneshot(late_ex).await;
1419                            }
1420                            break;
1421                        }
1422                    }
1423                }
1424            });
1425            #[cfg(test)]
1426            emit_start_route_event("pipeline_spawned");
1427
1428            // Start consumer after pipeline loop is spawned to avoid startup races
1429            // where consumers emit exchanges before the route pipeline begins polling.
1430            let consumer_handle = super::consumer_management::spawn_consumer_task(
1431                route_id.to_string(),
1432                consumer,
1433                consumer_ctx,
1434                crash_notifier,
1435                runtime_for_consumer,
1436                false,
1437            );
1438
1439            // Extend the stored consumer handle through aggregate force-completion.
1440            // While this monitor drains pending buckets, handle_is_running still reports
1441            // the Route as running because forced exchanges may still be in post-pipeline.
1442            let force_on_stop = agg_for_monitor
1443                .lock()
1444                .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
1445                .config()
1446                .force_completion_on_stop;
1447            let consumer_handle = tokio::spawn(async move {
1448                let _ = consumer_handle.await;
1449                if !pipeline_cancel_for_monitor.is_cancelled() {
1450                    let guard = agg_for_monitor
1451                        .lock()
1452                        .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
1453                    guard.force_complete_all();
1454                    drop(guard);
1455                    if force_on_stop {
1456                        pipeline_cancel_for_monitor.cancel();
1457                    }
1458                }
1459            });
1460            #[cfg(test)]
1461            emit_start_route_event("consumer_spawned");
1462
1463            let managed = self
1464                .routes
1465                .get_mut(route_id)
1466                .expect("invariant: route must exist"); // allow-unwrap
1467            managed.consumer_handle = Some(consumer_handle);
1468            managed.pipeline_handle = Some(pipeline_handle);
1469            managed.channel_sender = Some(tx_for_storage);
1470
1471            info!(route_id = %route_id, "Route started (aggregate with timeout)");
1472            return Ok(());
1473        }
1474        // --- End aggregator v2 branch ---
1475
1476        // Spawn pipeline task with its own cancellation token
1477        let pipeline_handle = match effective_concurrency {
1478            ConcurrencyModel::Sequential => {
1479                tokio::spawn(async move {
1480                    loop {
1481                        // Use select! to exit promptly on cancellation even when idle
1482                        let envelope = tokio::select! {
1483                            envelope = rx.recv() => match envelope {
1484                                Some(e) => e,
1485                                None => return, // Channel closed
1486                            },
1487                            _ = pipeline_cancel.cancelled() => {
1488                                // Cancellation requested - exit gracefully
1489                                return;
1490                            }
1491                        };
1492                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1493
1494                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1495                        let mut pipeline = pipeline.load().0.clone();
1496
1497                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1498                            if let Some(tx) = reply_tx {
1499                                let _ = tx.send(Err(e));
1500                            }
1501                            return;
1502                        }
1503
1504                        let result = pipeline.call(exchange).await;
1505                        if let Some(tx) = reply_tx {
1506                            let _ = tx.send(result);
1507                        } else if let Err(ref e) = result
1508                            && !matches!(e, CamelError::Stopped)
1509                        {
1510                            // log-policy: system-broken
1511                            error!("Pipeline error: {e}");
1512                        }
1513                    }
1514                })
1515            }
1516            ConcurrencyModel::Concurrent { max } => {
1517                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1518                tokio::spawn(async move {
1519                    loop {
1520                        // Use select! to exit promptly on cancellation even when idle
1521                        let envelope = tokio::select! {
1522                            envelope = rx.recv() => match envelope {
1523                                Some(e) => e,
1524                                None => return, // Channel closed
1525                            },
1526                            _ = pipeline_cancel.cancelled() => {
1527                                // Cancellation requested - exit gracefully
1528                                return;
1529                            }
1530                        };
1531                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1532                        let pipe_ref = Arc::clone(&pipeline);
1533                        let sem = sem.clone();
1534                        let cancel = pipeline_cancel.clone();
1535                        tokio::spawn(async move {
1536                            // Acquire semaphore permit if bounded
1537                            let _permit = match &sem {
1538                                Some(s) => Some(s.acquire().await.expect("semaphore closed")), // allow-unwrap
1539                                None => None,
1540                            };
1541
1542                            // Load current pipeline from ArcSwap
1543                            let mut pipe = pipe_ref.load().0.clone();
1544
1545                            // Wait for service ready with circuit breaker backoff
1546                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1547                                if let Some(tx) = reply_tx {
1548                                    let _ = tx.send(Err(e));
1549                                }
1550                                return;
1551                            }
1552
1553                            let result = pipe.call(exchange).await;
1554                            if let Some(tx) = reply_tx {
1555                                let _ = tx.send(result);
1556                            } else if let Err(ref e) = result
1557                                && !matches!(e, CamelError::Stopped)
1558                            {
1559                                // log-policy: system-broken
1560                                error!("Pipeline error: {e}");
1561                            }
1562                        });
1563                    }
1564                })
1565            }
1566        };
1567        #[cfg(test)]
1568        emit_start_route_event("pipeline_spawned");
1569
1570        // Start consumer after pipeline task is spawned to minimize the chance of
1571        // fire-and-forget events being produced before the pipeline loop is active.
1572        let consumer_handle = super::consumer_management::spawn_consumer_task(
1573            route_id.to_string(),
1574            consumer,
1575            consumer_ctx,
1576            crash_notifier,
1577            runtime_for_consumer,
1578            false,
1579        );
1580        #[cfg(test)]
1581        emit_start_route_event("consumer_spawned");
1582
1583        // Store handles and update status
1584        let managed = self
1585            .routes
1586            .get_mut(route_id)
1587            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1588        managed.consumer_handle = Some(consumer_handle);
1589        managed.pipeline_handle = Some(pipeline_handle);
1590        managed.channel_sender = Some(tx_for_storage);
1591
1592        info!(route_id = %route_id, "Route started");
1593        Ok(())
1594    }
1595
1596    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1597        self.stop_route_internal(route_id).await?;
1598        if let Some(reg) = &self.health_registry {
1599            reg.unregister_for_route(route_id);
1600        }
1601        Ok(())
1602    }
1603
1604    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1605        self.stop_route(route_id).await?;
1606        tokio::time::sleep(Duration::from_millis(100)).await;
1607        self.start_route(route_id).await
1608    }
1609
1610    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1611        // Check route exists and state.
1612        let managed = self
1613            .routes
1614            .get_mut(route_id)
1615            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1616
1617        let consumer_running = handle_is_running(&managed.consumer_handle);
1618        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1619
1620        // Can only suspend from active started state.
1621        if !consumer_running || !pipeline_running {
1622            return Err(CamelError::RouteError(format!(
1623                "Cannot suspend route '{}' with execution lifecycle {}",
1624                route_id,
1625                inferred_lifecycle_label(managed)
1626            )));
1627        }
1628
1629        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1630
1631        // Cancel consumer token only (keep pipeline running)
1632        let managed = self
1633            .routes
1634            .get_mut(route_id)
1635            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1636        managed.consumer_cancel_token.cancel();
1637
1638        // Take and join consumer handle
1639        let managed = self
1640            .routes
1641            .get_mut(route_id)
1642            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1643        let consumer_handle = managed.consumer_handle.take();
1644
1645        // Wait for consumer task to complete with timeout
1646        let timeout_result = tokio::time::timeout(DEFAULT_SHUTDOWN_TIMEOUT, async {
1647            if let Some(handle) = consumer_handle {
1648                let _ = handle.await;
1649            }
1650        })
1651        .await;
1652
1653        if timeout_result.is_err() {
1654            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1655        }
1656
1657        // Get the managed route again (can't hold across await)
1658        let managed = self
1659            .routes
1660            .get_mut(route_id)
1661            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1662
1663        // Create fresh cancellation token for consumer (for resume)
1664        managed.consumer_cancel_token = CancellationToken::new();
1665
1666        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1667        Ok(())
1668    }
1669
1670    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1671        // Check route exists and is Suspended-equivalent execution state.
1672        let managed = self
1673            .routes
1674            .get(route_id)
1675            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1676
1677        let consumer_running = handle_is_running(&managed.consumer_handle);
1678        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1679        if consumer_running || !pipeline_running {
1680            return Err(CamelError::RouteError(format!(
1681                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1682                route_id,
1683                inferred_lifecycle_label(managed)
1684            )));
1685        }
1686
1687        // Get the stored channel sender (must exist for a suspended route)
1688        let sender = managed.channel_sender.clone().ok_or_else(|| {
1689            CamelError::RouteError("Suspended route has no channel sender".into())
1690        })?;
1691
1692        // Get from_uri and concurrency for creating new consumer
1693        let from_uri = managed.from_uri.clone();
1694
1695        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1696
1697        let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
1698            Arc::clone(&self.registry),
1699            Arc::clone(&self.languages),
1700            self.tracer_metrics
1701                .clone()
1702                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1703            Arc::clone(&self.platform_service),
1704            self.health_registry(),
1705            Some(route_id.to_string()),
1706        ));
1707        let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
1708            Arc::clone(&consumer_component_ctx) as Arc<_>;
1709        let (mut consumer, _) = super::consumer_management::create_route_consumer(
1710            consumer_rt,
1711            &self.registry,
1712            &from_uri,
1713            consumer_component_ctx.as_ref(),
1714        )?;
1715
1716        // Wire security context before spawning consumer
1717        let managed = self
1718            .routes
1719            .get(route_id)
1720            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1721        if let (Some(sp_config), Some(authenticator)) = (
1722            managed.security_policy.as_ref(),
1723            managed.security_authenticator.as_ref(),
1724        ) {
1725            use camel_component_api::SecurityContext;
1726            let sec_ctx =
1727                SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
1728            consumer.set_security_context(sec_ctx);
1729        }
1730
1731        // Get the managed route for mutation
1732        let managed = self
1733            .routes
1734            .get_mut(route_id)
1735            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1736
1737        // Create child token for consumer lifecycle
1738        let consumer_cancel = managed.consumer_cancel_token.child_token();
1739
1740        let crash_notifier = self.crash_notifier.clone();
1741        let runtime_for_consumer = self.runtime.clone();
1742
1743        // Create ConsumerContext with the stored sender
1744        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1745
1746        // Spawn consumer task
1747        let consumer_handle = super::consumer_management::spawn_consumer_task(
1748            route_id.to_string(),
1749            consumer,
1750            consumer_ctx,
1751            crash_notifier,
1752            runtime_for_consumer,
1753            true,
1754        );
1755
1756        // Store consumer handle and update status
1757        let managed = self
1758            .routes
1759            .get_mut(route_id)
1760            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
1761        managed.consumer_handle = Some(consumer_handle);
1762
1763        info!(route_id = %route_id, "Route resumed");
1764        Ok(())
1765    }
1766
1767    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1768        // Only start routes where auto_startup() == true
1769        // Sort by startup_order() ascending before starting
1770        let route_ids: Vec<String> = {
1771            let mut pairs: Vec<_> = self
1772                .routes
1773                .iter()
1774                .filter(|(_, r)| r.definition.auto_startup())
1775                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1776                .collect();
1777            pairs.sort_by_key(|(_, order)| *order);
1778            pairs.into_iter().map(|(id, _)| id).collect()
1779        };
1780
1781        info!("Starting {} auto-startup routes", route_ids.len());
1782
1783        // Collect errors but continue starting remaining routes
1784        let mut errors: Vec<String> = Vec::new();
1785        for route_id in route_ids {
1786            if let Err(e) = self.start_route(&route_id).await {
1787                errors.push(format!("Route '{}': {}", route_id, e));
1788            }
1789        }
1790
1791        if !errors.is_empty() {
1792            return Err(CamelError::RouteError(format!(
1793                "Failed to start routes: {}",
1794                errors.join(", ")
1795            )));
1796        }
1797
1798        info!("All auto-startup routes started");
1799        Ok(())
1800    }
1801
1802    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1803        // Sort by startup_order descending (reverse order)
1804        let route_ids: Vec<String> = {
1805            let mut pairs: Vec<_> = self
1806                .routes
1807                .iter()
1808                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1809                .collect();
1810            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1811            pairs.into_iter().map(|(id, _)| id).collect()
1812        };
1813
1814        info!("Stopping {} routes", route_ids.len());
1815
1816        for route_id in route_ids {
1817            let _ = self.stop_route(&route_id).await;
1818        }
1819
1820        info!("All routes stopped");
1821        Ok(())
1822    }
1823}
1824
1825#[cfg(test)]
1826#[path = "route_controller_tests.rs"]
1827mod tests;