Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21    BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
22    NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeCommand,
23    RuntimeHandle,
24};
25use camel_component_api::{
26    ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
27};
28use camel_endpoint::parse_uri;
29pub use camel_processor::aggregator::SharedLanguageRegistry;
30use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
31use camel_processor::circuit_breaker::CircuitBreakerLayer;
32use camel_processor::error_handler::ErrorHandlerLayer;
33
34use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
35use crate::lifecycle::adapters::route_compiler::{
36    compose_pipeline, compose_traced_pipeline_with_contracts,
37};
38use crate::lifecycle::application::route_definition::{
39    BuilderStep, RouteDefinition, RouteDefinitionInfo,
40};
41use crate::shared::components::domain::Registry;
42use crate::shared::observability::domain::{DetailLevel, TracerConfig};
43use arc_swap::ArcSwap;
44use camel_bean::BeanRegistry;
45
46/// Notification sent when a route crashes.
47#[derive(Debug, Clone)]
48pub struct CrashNotification {
49    /// The ID of the crashed route.
50    pub route_id: String,
51    /// The error that caused the crash.
52    pub error: String,
53}
54
55/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
56///
57/// # Safety
58///
59/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
60/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
61///
62/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
63///    (creates a new boxed service from the inner clone).
64/// 2. The clone is owned by the calling thread and never shared.
65/// 3. ArcSwap guarantees we only get & references (no &mut).
66///
67/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
68/// clone() does not mutate shared state and each thread gets an independent copy.
69pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
70unsafe impl Sync for SyncBoxProcessor {}
71
72type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
73
74#[cfg(test)]
75type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
76
77#[cfg(test)]
78static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
79    std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
80
81#[cfg(test)]
82fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
83    *START_ROUTE_EVENT_HOOK
84        .lock()
85        .expect("start route event hook lock") = hook;
86}
87
88#[cfg(test)]
89fn emit_start_route_event(event: &'static str) {
90    if let Some(hook) = START_ROUTE_EVENT_HOOK
91        .lock()
92        .expect("start route event hook lock")
93        .as_ref()
94    {
95        hook(event);
96    }
97}
98
99/// Internal state for a managed route.
100pub(super) struct AggregateSplitInfo {
101    pub(super) pre_pipeline: SharedPipeline,
102    pub(super) agg_config: AggregatorConfig,
103    pub(super) post_pipeline: SharedPipeline,
104}
105
106pub(super) struct ManagedRoute {
107    /// The route definition metadata (for introspection).
108    pub(super) definition: RouteDefinitionInfo,
109    /// Source endpoint URI.
110    pub(super) from_uri: String,
111    /// Resolved processor pipeline (wrapped for atomic swap).
112    pub(super) pipeline: SharedPipeline,
113    /// Concurrency model override (if any).
114    pub(super) concurrency: Option<ConcurrencyModel>,
115    /// Handle for the consumer task (if running).
116    pub(super) consumer_handle: Option<JoinHandle<()>>,
117    /// Handle for the pipeline task (if running).
118    pub(super) pipeline_handle: Option<JoinHandle<()>>,
119    /// Cancellation token for stopping the consumer task.
120    /// This allows independent control of the consumer lifecycle (for suspend/resume).
121    pub(super) consumer_cancel_token: CancellationToken,
122    /// Cancellation token for stopping the pipeline task.
123    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
124    pub(super) pipeline_cancel_token: CancellationToken,
125    /// Channel sender for sending exchanges to the pipeline.
126    /// Stored to allow resuming a suspended route without recreating the channel.
127    pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
128    /// In-flight exchange counter. `None` when UoW is not configured for this route.
129    pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
130    pub(super) aggregate_split: Option<AggregateSplitInfo>,
131    pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
132}
133
134pub(crate) struct PreparedRoute {
135    pub(crate) route_id: String,
136    pub(super) managed: ManagedRoute,
137}
138
139pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
140    handle.as_ref().is_some_and(|h| !h.is_finished())
141}
142
143fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
144    match (
145        handle_is_running(&managed.consumer_handle),
146        handle_is_running(&managed.pipeline_handle),
147    ) {
148        (true, true) => "Started",
149        (false, true) => "Suspended",
150        (true, false) => "Stopping",
151        (false, false) => "Stopped",
152    }
153}
154
155fn find_top_level_aggregate_with_timeout(
156    steps: &[BuilderStep],
157) -> Option<(usize, AggregatorConfig)> {
158    for (i, step) in steps.iter().enumerate() {
159        if let BuilderStep::Aggregate { config } = step {
160            if has_timeout_condition(&config.completion) {
161                return Some((i, config.clone()));
162            }
163            break;
164        }
165    }
166    None
167}
168
169pub(crate) struct ControllerComponentContext {
170    registry: Arc<std::sync::Mutex<Registry>>,
171    languages: SharedLanguageRegistry,
172    metrics: Arc<dyn MetricsCollector>,
173    platform_service: Arc<dyn PlatformService>,
174}
175
176impl ControllerComponentContext {
177    pub(crate) fn new(
178        registry: Arc<std::sync::Mutex<Registry>>,
179        languages: SharedLanguageRegistry,
180        metrics: Arc<dyn MetricsCollector>,
181        platform_service: Arc<dyn PlatformService>,
182    ) -> Self {
183        Self {
184            registry,
185            languages,
186            metrics,
187            platform_service,
188        }
189    }
190}
191
192impl ComponentContext for ControllerComponentContext {
193    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
194        self.registry.lock().ok()?.get(scheme)
195    }
196
197    fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
198        self.languages.lock().ok()?.get(name).cloned()
199    }
200
201    fn metrics(&self) -> Arc<dyn MetricsCollector> {
202        Arc::clone(&self.metrics)
203    }
204
205    fn platform_service(&self) -> Arc<dyn PlatformService> {
206        Arc::clone(&self.platform_service)
207    }
208}
209
210fn is_pending(ex: &Exchange) -> bool {
211    ex.property("CamelAggregatorPending")
212        .and_then(|v| v.as_bool())
213        .unwrap_or(false)
214}
215
216/// Wait for a pipeline service to be ready with circuit breaker backoff.
217///
218/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
219/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
220/// cancellation checks. It returns `Ok(())` when the service is ready, or
221/// `Err(e)` if cancellation occurred or a fatal error was encountered.
222async fn ready_with_backoff(
223    pipeline: &mut BoxProcessor,
224    cancel: &CancellationToken,
225) -> Result<(), CamelError> {
226    loop {
227        match pipeline.ready().await {
228            Ok(_) => return Ok(()),
229            Err(CamelError::CircuitOpen(ref msg)) => {
230                warn!("Circuit open, backing off: {msg}");
231                tokio::select! {
232                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
233                        continue;
234                    }
235                    _ = cancel.cancelled() => {
236                        // Shutting down — don't retry.
237                        return Err(CamelError::CircuitOpen(msg.clone()));
238                    }
239                }
240            }
241            Err(e) => {
242                error!("Pipeline not ready: {e}");
243                return Err(e);
244            }
245        }
246    }
247}
248
249fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
250    let stamp = std::time::SystemTime::now()
251        .duration_since(std::time::UNIX_EPOCH)
252        .unwrap_or_default()
253        .as_nanos();
254    RuntimeCommand::FailRoute {
255        route_id: route_id.to_string(),
256        error: error.to_string(),
257        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
258        causation_id: None,
259    }
260}
261
262pub(super) async fn publish_runtime_failure(
263    runtime: Option<Weak<dyn RuntimeHandle>>,
264    route_id: &str,
265    error: &str,
266) {
267    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
268        return;
269    };
270    let command = runtime_failure_command(route_id, error);
271    if let Err(runtime_error) = runtime.execute(command).await {
272        warn!(
273            route_id = %route_id,
274            error = %runtime_error,
275            "failed to synchronize route crash with runtime projection"
276        );
277    }
278}
279
280/// Default implementation of [`RouteController`].
281///
282/// Manages route lifecycle with support for:
283/// - Starting/stopping individual routes
284/// - Suspending and resuming routes
285/// - Auto-startup with startup ordering
286/// - Graceful shutdown
287pub struct DefaultRouteController {
288    /// Routes indexed by route ID.
289    routes: HashMap<String, ManagedRoute>,
290    /// Reference to the component registry for resolving endpoints.
291    registry: Arc<std::sync::Mutex<Registry>>,
292    /// Shared language registry for resolving declarative language expressions.
293    languages: SharedLanguageRegistry,
294    /// Bean registry for bean method invocation.
295    beans: Arc<std::sync::Mutex<BeanRegistry>>,
296    /// Runtime handle injected into ProducerContext for command/query operations.
297    runtime: Option<Weak<dyn RuntimeHandle>>,
298    /// Optional global error handler applied to all routes without a per-route handler.
299    global_error_handler: Option<ErrorHandlerConfig>,
300    /// Optional crash notifier for supervision.
301    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
302    /// Whether tracing is enabled for route pipelines.
303    tracing_enabled: bool,
304    /// Detail level for tracing when enabled.
305    tracer_detail_level: DetailLevel,
306    /// Metrics collector for tracing processor.
307    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
308    platform_service: Arc<dyn PlatformService>,
309    function_invoker: Option<Arc<dyn FunctionInvoker>>,
310}
311
312impl DefaultRouteController {
313    /// Create a new `DefaultRouteController` with the given registry.
314    pub fn new(
315        registry: Arc<std::sync::Mutex<Registry>>,
316        platform_service: Arc<dyn PlatformService>,
317    ) -> Self {
318        Self::with_beans_and_platform_service(
319            registry,
320            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
321            platform_service,
322        )
323    }
324
325    /// Create a new `DefaultRouteController` with shared bean registry.
326    pub fn with_beans(
327        registry: Arc<std::sync::Mutex<Registry>>,
328        beans: Arc<std::sync::Mutex<BeanRegistry>>,
329    ) -> Self {
330        Self::with_beans_and_platform_service(
331            registry,
332            beans,
333            Arc::new(NoopPlatformService::default()),
334        )
335    }
336
337    fn with_beans_and_platform_service(
338        registry: Arc<std::sync::Mutex<Registry>>,
339        beans: Arc<std::sync::Mutex<BeanRegistry>>,
340        platform_service: Arc<dyn PlatformService>,
341    ) -> Self {
342        Self {
343            routes: HashMap::new(),
344            registry,
345            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
346            beans,
347            runtime: None,
348            global_error_handler: None,
349            crash_notifier: None,
350            tracing_enabled: false,
351            tracer_detail_level: DetailLevel::Minimal,
352            tracer_metrics: None,
353            platform_service,
354            function_invoker: None,
355        }
356    }
357
358    /// Create a new `DefaultRouteController` with shared language registry.
359    pub fn with_languages(
360        registry: Arc<std::sync::Mutex<Registry>>,
361        languages: SharedLanguageRegistry,
362        platform_service: Arc<dyn PlatformService>,
363    ) -> Self {
364        Self {
365            routes: HashMap::new(),
366            registry,
367            languages,
368            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
369            runtime: None,
370            global_error_handler: None,
371            crash_notifier: None,
372            tracing_enabled: false,
373            tracer_detail_level: DetailLevel::Minimal,
374            tracer_metrics: None,
375            platform_service,
376            function_invoker: None,
377        }
378    }
379
380    pub fn with_languages_and_beans(
381        registry: Arc<std::sync::Mutex<Registry>>,
382        languages: SharedLanguageRegistry,
383        platform_service: Arc<dyn PlatformService>,
384        beans: Arc<std::sync::Mutex<BeanRegistry>>,
385    ) -> Self {
386        Self {
387            routes: HashMap::new(),
388            registry,
389            languages,
390            beans,
391            runtime: None,
392            global_error_handler: None,
393            crash_notifier: None,
394            tracing_enabled: false,
395            tracer_detail_level: DetailLevel::Minimal,
396            tracer_metrics: None,
397            platform_service,
398            function_invoker: None,
399        }
400    }
401
402    pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
403        self.function_invoker = Some(function_invoker);
404        self
405    }
406
407    pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
408        self.function_invoker = Some(invoker);
409    }
410
411    /// Set runtime handle for ProducerContext creation.
412    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
413        self.runtime = Some(Arc::downgrade(&runtime));
414    }
415
416    /// Set the crash notifier for supervision.
417    ///
418    /// When set, the controller will send a [`CrashNotification`] whenever
419    /// a consumer crashes.
420    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
421        self.crash_notifier = Some(tx);
422    }
423
424    /// Set a global error handler applied to all routes without a per-route handler.
425    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
426        self.global_error_handler = Some(config);
427    }
428
429    /// Configure tracing for this route controller.
430    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
431        self.tracing_enabled = config.enabled;
432        self.tracer_detail_level = config.detail_level.clone();
433        self.tracer_metrics = config.metrics_collector.clone();
434    }
435
436    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
437        let mut producer_ctx = ProducerContext::new();
438        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
439            producer_ctx = producer_ctx.with_runtime(runtime);
440        }
441        Ok(producer_ctx)
442    }
443
444    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
445    fn resolve_error_handler(
446        &self,
447        config: ErrorHandlerConfig,
448        producer_ctx: &ProducerContext,
449        component_ctx: &dyn ComponentContext,
450    ) -> Result<ErrorHandlerLayer, CamelError> {
451        // Resolve DLC URI → producer.
452        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
453            let parsed = parse_uri(uri)?;
454            let component = component_ctx
455                .resolve_component(&parsed.scheme)
456                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
457            let endpoint = component.create_endpoint(uri, component_ctx)?;
458            Some(endpoint.create_producer(producer_ctx)?)
459        } else {
460            None
461        };
462
463        // Resolve per-policy `handled_by` URIs.
464        let mut resolved_policies = Vec::new();
465        for policy in config.policies {
466            let handler_producer = if let Some(ref uri) = policy.handled_by {
467                let parsed = parse_uri(uri)?;
468                let component = component_ctx
469                    .resolve_component(&parsed.scheme)
470                    .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
471                let endpoint = component.create_endpoint(uri, component_ctx)?;
472                Some(endpoint.create_producer(producer_ctx)?)
473            } else {
474                None
475            };
476            resolved_policies.push((policy, handler_producer));
477        }
478
479        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
480    }
481
482    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
483    /// Returns `Err` if any hook URI cannot be resolved.
484    fn resolve_uow_layer(
485        &self,
486        config: &UnitOfWorkConfig,
487        producer_ctx: &ProducerContext,
488        component_ctx: &dyn ComponentContext,
489        counter: Option<Arc<AtomicU64>>,
490    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
491        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
492            let parsed = parse_uri(uri)?;
493            let component = component_ctx
494                .resolve_component(&parsed.scheme)
495                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
496            let endpoint = component.create_endpoint(uri, component_ctx)?;
497            endpoint.create_producer(producer_ctx).map_err(|e| {
498                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
499            })
500        };
501
502        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
503        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
504
505        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
506        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
507        Ok((layer, counter))
508    }
509
510    /// Resolve BuilderSteps into BoxProcessors.
511    pub(crate) fn resolve_steps(
512        &self,
513        steps: Vec<BuilderStep>,
514        producer_ctx: &ProducerContext,
515        registry: &Arc<std::sync::Mutex<Registry>>,
516        route_id: Option<&str>,
517        staging_mode: &super::step_resolution::FunctionStagingMode,
518    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
519        let component_ctx = Arc::new(ControllerComponentContext::new(
520            Arc::clone(registry),
521            Arc::clone(&self.languages),
522            self.tracer_metrics
523                .clone()
524                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
525            Arc::clone(&self.platform_service),
526        ));
527
528        super::step_resolution::resolve_steps(
529            steps,
530            producer_ctx,
531            registry,
532            &self.languages,
533            &self.beans,
534            self.function_invoker.clone(),
535            component_ctx,
536            route_id,
537            staging_mode,
538        )
539    }
540
541    /// Add a route definition to the controller.
542    ///
543    /// Steps are resolved immediately using the registry.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if:
548    /// - A route with the same ID already exists
549    /// - Step resolution fails
550    pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
551        let route_id = definition.route_id().to_string();
552
553        if self.routes.contains_key(&route_id) {
554            return Err(CamelError::RouteError(format!(
555                "Route '{}' already exists",
556                route_id
557            )));
558        }
559
560        info!(route_id = %route_id, "Adding route to controller");
561
562        let prepared = match self.build_managed_route(
563            definition,
564            &super::step_resolution::FunctionStagingMode::DirectAdd,
565        ) {
566            Ok(prepared) => prepared,
567            Err(err) => {
568                self.discard_function_staging();
569                return Err(err);
570            }
571        };
572
573        if let Some(invoker) = &self.function_invoker
574            && let Err(err) = invoker.commit_staged().await
575        {
576            invoker.discard_staging(0);
577            return Err(CamelError::Config(err.to_string()));
578        }
579
580        self.routes
581            .insert(prepared.route_id.clone(), prepared.managed);
582
583        Ok(())
584    }
585
586    fn build_managed_route(
587        &self,
588        definition: RouteDefinition,
589        staging_mode: &super::step_resolution::FunctionStagingMode,
590    ) -> Result<PreparedRoute, CamelError> {
591        let route_id = definition.route_id().to_string();
592
593        let definition_info = definition.to_info();
594        let RouteDefinition {
595            from_uri,
596            steps,
597            error_handler,
598            circuit_breaker,
599            unit_of_work,
600            concurrency,
601            ..
602        } = definition;
603
604        let producer_ctx = self.build_producer_context()?;
605
606        let mut aggregate_split: Option<AggregateSplitInfo> = None;
607        let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
608            Some((idx, agg_config)) => {
609                let mut pre_steps = steps;
610                let mut rest = pre_steps.split_off(idx);
611                let _agg_step = rest.remove(0);
612                let post_steps = rest;
613
614                let pre_pairs = self.resolve_steps(
615                    pre_steps,
616                    &producer_ctx,
617                    &self.registry,
618                    Some(&route_id),
619                    staging_mode,
620                )?;
621                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
622                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
623                    compose_pipeline(pre_procs),
624                )));
625
626                let post_pairs = self.resolve_steps(
627                    post_steps,
628                    &producer_ctx,
629                    &self.registry,
630                    Some(&route_id),
631                    staging_mode,
632                )?;
633                let post_procs: Vec<BoxProcessor> =
634                    post_pairs.into_iter().map(|(p, _)| p).collect();
635                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
636                    compose_pipeline(post_procs),
637                )));
638
639                aggregate_split = Some(AggregateSplitInfo {
640                    pre_pipeline,
641                    agg_config,
642                    post_pipeline,
643                });
644
645                vec![]
646            }
647            None => self.resolve_steps(
648                steps,
649                &producer_ctx,
650                &self.registry,
651                Some(&route_id),
652                staging_mode,
653            )?,
654        };
655        let route_id_for_tracing = route_id.clone();
656        let mut pipeline = if processors_with_contracts.is_empty() {
657            BoxProcessor::new(IdentityProcessor)
658        } else {
659            compose_traced_pipeline_with_contracts(
660                processors_with_contracts,
661                &route_id_for_tracing,
662                self.tracing_enabled,
663                self.tracer_detail_level.clone(),
664                self.tracer_metrics.clone(),
665            )
666        };
667
668        if let Some(cb_config) = circuit_breaker {
669            let cb_layer = CircuitBreakerLayer::new(cb_config);
670            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
671        }
672
673        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
674
675        if let Some(config) = eh_config {
676            let component_ctx = ControllerComponentContext::new(
677                Arc::clone(&self.registry),
678                Arc::clone(&self.languages),
679                self.tracer_metrics
680                    .clone()
681                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
682                Arc::clone(&self.platform_service),
683            );
684            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
685            pipeline = BoxProcessor::new(layer.layer(pipeline));
686        }
687
688        let uow_counter = if let Some(uow_config) = &unit_of_work {
689            let component_ctx = ControllerComponentContext::new(
690                Arc::clone(&self.registry),
691                Arc::clone(&self.languages),
692                self.tracer_metrics
693                    .clone()
694                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
695                Arc::clone(&self.platform_service),
696            );
697            let (uow_layer, counter) =
698                self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
699            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
700            Some(counter)
701        } else {
702            None
703        };
704
705        Ok(PreparedRoute {
706            route_id,
707            managed: ManagedRoute {
708                definition: definition_info,
709                from_uri,
710                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
711                concurrency,
712                consumer_handle: None,
713                pipeline_handle: None,
714                consumer_cancel_token: CancellationToken::new(),
715                pipeline_cancel_token: CancellationToken::new(),
716                channel_sender: None,
717                in_flight: uow_counter,
718                aggregate_split,
719                agg_service: None,
720            },
721        })
722    }
723
724    pub(crate) fn insert_prepared_route(
725        &mut self,
726        prepared: PreparedRoute,
727    ) -> Result<(), CamelError> {
728        if self.routes.contains_key(&prepared.route_id) {
729            return Err(CamelError::RouteError(format!(
730                "Route '{}' already exists",
731                prepared.route_id
732            )));
733        }
734        self.routes
735            .insert(prepared.route_id.clone(), prepared.managed);
736        Ok(())
737    }
738
739    pub async fn add_route_with_generation(
740        &mut self,
741        definition: RouteDefinition,
742        generation: u64,
743    ) -> Result<(), CamelError> {
744        let route_id = definition.route_id().to_string();
745
746        if self.routes.contains_key(&route_id) {
747            return Err(CamelError::RouteError(format!(
748                "Route '{}' already exists",
749                route_id
750            )));
751        }
752
753        info!(route_id = %route_id, generation, "Adding route to controller with generation");
754
755        let prepared = self.build_managed_route(
756            definition,
757            &super::step_resolution::FunctionStagingMode::HotReload { generation },
758        )?;
759
760        self.routes
761            .insert(prepared.route_id.clone(), prepared.managed);
762
763        Ok(())
764    }
765
766    pub(crate) fn prepare_route_definition_with_generation(
767        &self,
768        definition: RouteDefinition,
769        generation: u64,
770    ) -> Result<PreparedRoute, CamelError> {
771        self.build_managed_route(
772            definition,
773            &super::step_resolution::FunctionStagingMode::HotReload { generation },
774        )
775    }
776
777    pub async fn remove_route_preserving_functions(
778        &mut self,
779        route_id: &str,
780    ) -> Result<(), CamelError> {
781        let managed = self.routes.get(route_id).ok_or_else(|| {
782            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
783        })?;
784        if handle_is_running(&managed.consumer_handle)
785            || handle_is_running(&managed.pipeline_handle)
786        {
787            return Err(CamelError::RouteError(format!(
788                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
789                route_id,
790                inferred_lifecycle_label(managed)
791            )));
792        }
793        self.routes.remove(route_id);
794        info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
795        Ok(())
796    }
797
798    pub fn compile_route_definition(
799        &self,
800        def: RouteDefinition,
801    ) -> Result<BoxProcessor, CamelError> {
802        let route_id = def.route_id().to_string();
803
804        let producer_ctx = self.build_producer_context()?;
805
806        let processors_with_contracts = self.resolve_steps(
807            def.steps,
808            &producer_ctx,
809            &self.registry,
810            Some(&route_id),
811            &super::step_resolution::FunctionStagingMode::DryCompile,
812        )?;
813        let mut pipeline = compose_traced_pipeline_with_contracts(
814            processors_with_contracts,
815            &route_id,
816            self.tracing_enabled,
817            self.tracer_detail_level.clone(),
818            self.tracer_metrics.clone(),
819        );
820
821        if let Some(cb_config) = def.circuit_breaker {
822            let cb_layer = CircuitBreakerLayer::new(cb_config);
823            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
824        }
825
826        let eh_config = def
827            .error_handler
828            .clone()
829            .or_else(|| self.global_error_handler.clone());
830        if let Some(config) = eh_config {
831            let component_ctx = ControllerComponentContext::new(
832                Arc::clone(&self.registry),
833                Arc::clone(&self.languages),
834                self.tracer_metrics
835                    .clone()
836                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
837                Arc::clone(&self.platform_service),
838            );
839            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
840            pipeline = BoxProcessor::new(layer.layer(pipeline));
841        }
842
843        // Apply UoW layer outermost
844        if let Some(uow_config) = &def.unit_of_work {
845            let existing_counter = self
846                .routes
847                .get(&route_id)
848                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
849
850            let component_ctx = ControllerComponentContext::new(
851                Arc::clone(&self.registry),
852                Arc::clone(&self.languages),
853                self.tracer_metrics
854                    .clone()
855                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
856                Arc::clone(&self.platform_service),
857            );
858
859            let (uow_layer, _counter) = self.resolve_uow_layer(
860                uow_config,
861                &producer_ctx,
862                &component_ctx,
863                existing_counter,
864            )?;
865
866            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
867        }
868
869        Ok(pipeline)
870    }
871
872    pub fn compile_route_definition_with_generation(
873        &self,
874        def: RouteDefinition,
875        generation: u64,
876    ) -> Result<BoxProcessor, CamelError> {
877        let route_id = def.route_id().to_string();
878
879        let producer_ctx = self.build_producer_context()?;
880
881        let processors_with_contracts = self.resolve_steps(
882            def.steps,
883            &producer_ctx,
884            &self.registry,
885            Some(&route_id),
886            &super::step_resolution::FunctionStagingMode::HotReload { generation },
887        )?;
888        let mut pipeline = compose_traced_pipeline_with_contracts(
889            processors_with_contracts,
890            &route_id,
891            self.tracing_enabled,
892            self.tracer_detail_level.clone(),
893            self.tracer_metrics.clone(),
894        );
895
896        if let Some(cb_config) = def.circuit_breaker {
897            let cb_layer = CircuitBreakerLayer::new(cb_config);
898            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
899        }
900
901        let eh_config = def
902            .error_handler
903            .clone()
904            .or_else(|| self.global_error_handler.clone());
905        if let Some(config) = eh_config {
906            let component_ctx = ControllerComponentContext::new(
907                Arc::clone(&self.registry),
908                Arc::clone(&self.languages),
909                self.tracer_metrics
910                    .clone()
911                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
912                Arc::clone(&self.platform_service),
913            );
914            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
915            pipeline = BoxProcessor::new(layer.layer(pipeline));
916        }
917
918        if let Some(uow_config) = &def.unit_of_work {
919            let existing_counter = self
920                .routes
921                .get(&route_id)
922                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
923
924            let component_ctx = ControllerComponentContext::new(
925                Arc::clone(&self.registry),
926                Arc::clone(&self.languages),
927                self.tracer_metrics
928                    .clone()
929                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
930                Arc::clone(&self.platform_service),
931            );
932
933            let (uow_layer, _counter) = self.resolve_uow_layer(
934                uow_config,
935                &producer_ctx,
936                &component_ctx,
937                existing_counter,
938            )?;
939
940            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
941        }
942
943        Ok(pipeline)
944    }
945
946    /// Remove a route from the controller map.
947    ///
948    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
949    /// Returns an error if the route is still running or does not exist.
950    /// Does not cancel any running tasks — call `stop_route` first.
951    pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
952        let managed = self.routes.get(route_id).ok_or_else(|| {
953            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
954        })?;
955        if handle_is_running(&managed.consumer_handle)
956            || handle_is_running(&managed.pipeline_handle)
957        {
958            return Err(CamelError::RouteError(format!(
959                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
960                route_id,
961                inferred_lifecycle_label(managed)
962            )));
963        }
964        if let Some(invoker) = &self.function_invoker {
965            for (id, rid) in self.collect_function_refs(route_id) {
966                if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
967                    warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
968                }
969            }
970        }
971        self.routes.remove(route_id);
972        info!(route_id = %route_id, "Route removed from controller");
973        Ok(())
974    }
975
976    fn collect_function_refs(
977        &self,
978        route_id: &str,
979    ) -> Vec<(camel_api::FunctionId, Option<String>)> {
980        self.function_invoker
981            .as_ref()
982            .map(|invoker| invoker.function_refs_for_route(route_id))
983            .unwrap_or_default()
984    }
985
986    fn discard_function_staging(&self) {
987        if let Some(invoker) = &self.function_invoker {
988            invoker.discard_staging(0);
989        }
990    }
991
992    /// Returns the number of routes in the controller.
993    pub fn route_count(&self) -> usize {
994        self.routes.len()
995    }
996
997    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
998        self.routes.get(route_id).map(|r| {
999            r.in_flight
1000                .as_ref()
1001                .map_or(0, |c| c.load(Ordering::Relaxed))
1002        })
1003    }
1004
1005    /// Returns `true` if a route with the given ID exists.
1006    pub fn route_exists(&self, route_id: &str) -> bool {
1007        self.routes.contains_key(route_id)
1008    }
1009
1010    /// Returns all route IDs.
1011    pub fn route_ids(&self) -> Vec<String> {
1012        self.routes.keys().cloned().collect()
1013    }
1014
1015    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
1016        self.routes
1017            .get(route_id)
1018            .and_then(|m| m.definition.source_hash())
1019    }
1020
1021    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
1022    pub fn auto_startup_route_ids(&self) -> Vec<String> {
1023        let mut pairs: Vec<(String, i32)> = self
1024            .routes
1025            .iter()
1026            .filter(|(_, managed)| managed.definition.auto_startup())
1027            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1028            .collect();
1029        pairs.sort_by_key(|(_, order)| *order);
1030        pairs.into_iter().map(|(id, _)| id).collect()
1031    }
1032
1033    /// Returns route IDs sorted by shutdown order (startup order descending).
1034    pub fn shutdown_route_ids(&self) -> Vec<String> {
1035        let mut pairs: Vec<(String, i32)> = self
1036            .routes
1037            .iter()
1038            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1039            .collect();
1040        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1041        pairs.into_iter().map(|(id, _)| id).collect()
1042    }
1043
1044    /// Atomically swap the pipeline of a route.
1045    ///
1046    /// In-flight requests finish with the old pipeline (kept alive by Arc).
1047    /// New requests immediately use the new pipeline.
1048    pub fn swap_pipeline(
1049        &self,
1050        route_id: &str,
1051        new_pipeline: BoxProcessor,
1052    ) -> Result<(), CamelError> {
1053        let managed = self
1054            .routes
1055            .get(route_id)
1056            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1057
1058        if managed.aggregate_split.is_some() {
1059            tracing::warn!(
1060                route_id = %route_id,
1061                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1062            );
1063        }
1064
1065        managed
1066            .pipeline
1067            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1068        info!(route_id = %route_id, "Pipeline swapped atomically");
1069        Ok(())
1070    }
1071
1072    /// Returns the from_uri of a route, if it exists.
1073    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1074        self.routes.get(route_id).map(|r| r.from_uri.clone())
1075    }
1076
1077    /// Get a clone of the current pipeline for a route.
1078    ///
1079    /// This is useful for testing and introspection.
1080    /// Returns `None` if the route doesn't exist.
1081    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1082        self.routes
1083            .get(route_id)
1084            .map(|r| r.pipeline.load().0.clone())
1085    }
1086
1087    /// Internal stop implementation that can set custom status.
1088    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1089        super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
1090    }
1091
1092    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1093        self.start_route(route_id).await
1094    }
1095
1096    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1097        self.stop_route(route_id).await
1098    }
1099}
1100
1101#[async_trait::async_trait]
1102impl RouteController for DefaultRouteController {
1103    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1104        // Check if route exists and can be started.
1105        {
1106            let managed = self
1107                .routes
1108                .get_mut(route_id)
1109                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1110
1111            let consumer_running = handle_is_running(&managed.consumer_handle);
1112            let pipeline_running = handle_is_running(&managed.pipeline_handle);
1113            if consumer_running && pipeline_running {
1114                return Ok(());
1115            }
1116            if !consumer_running && pipeline_running {
1117                return Err(CamelError::RouteError(format!(
1118                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1119                    route_id
1120                )));
1121            }
1122            if consumer_running && !pipeline_running {
1123                return Err(CamelError::RouteError(format!(
1124                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1125                    route_id
1126                )));
1127            }
1128        }
1129
1130        info!(route_id = %route_id, "Starting route");
1131
1132        // Get the resolved route info
1133        let (from_uri, pipeline, concurrency) = {
1134            let managed = self
1135                .routes
1136                .get(route_id)
1137                .expect("invariant: route must exist after prior existence check");
1138            (
1139                managed.from_uri.clone(),
1140                Arc::clone(&managed.pipeline),
1141                managed.concurrency.clone(),
1142            )
1143        };
1144
1145        // Clone crash notifier for consumer task
1146        let crash_notifier = self.crash_notifier.clone();
1147        let runtime_for_consumer = self.runtime.clone();
1148
1149        let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
1150            &self.registry,
1151            &from_uri,
1152            &ControllerComponentContext::new(
1153                Arc::clone(&self.registry),
1154                Arc::clone(&self.languages),
1155                self.tracer_metrics
1156                    .clone()
1157                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1158                Arc::clone(&self.platform_service),
1159            ),
1160        )?;
1161
1162        // Resolve effective concurrency: route override > consumer default
1163        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1164
1165        // Get the managed route for mutation
1166        let managed = self
1167            .routes
1168            .get_mut(route_id)
1169            .expect("invariant: route must exist after prior existence check");
1170
1171        // Create channel for consumer to send exchanges
1172        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1173        // Create child tokens for independent lifecycle control
1174        let consumer_cancel = managed.consumer_cancel_token.child_token();
1175        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1176        // Clone sender for storage (to reuse on resume)
1177        let tx_for_storage = tx.clone();
1178        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1179
1180        // --- Aggregator v2: check for aggregate route with timeout ---
1181        let managed = self
1182            .routes
1183            .get_mut(route_id)
1184            .expect("invariant: route must exist after prior existence check");
1185
1186        if let Some(split) = managed.aggregate_split.as_ref() {
1187            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1188
1189            let route_cancel_clone = pipeline_cancel.clone();
1190            let svc = AggregatorService::new(
1191                split.agg_config.clone(),
1192                late_tx,
1193                Arc::clone(&self.languages),
1194                route_cancel_clone,
1195            );
1196            let agg = Arc::new(std::sync::Mutex::new(svc));
1197
1198            managed.agg_service = Some(Arc::clone(&agg));
1199
1200            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1201            let pre_pipeline = Arc::clone(&split.pre_pipeline);
1202            let post_pipeline = Arc::clone(&split.post_pipeline);
1203
1204            // Spawn biased select forward loop
1205            let pipeline_handle = tokio::spawn(async move {
1206                loop {
1207                    tokio::select! {
1208                        biased;
1209
1210                        late_ex = async {
1211                            let mut rx = late_rx.lock().await;
1212                            rx.recv().await
1213                        } => {
1214                            match late_ex {
1215                                Some(ex) => {
1216                                    let pipe = post_pipeline.load();
1217                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
1218                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
1219                                    }
1220                                }
1221                                None => return,
1222                            }
1223                        }
1224
1225                        envelope_opt = rx.recv() => {
1226                            match envelope_opt {
1227                                Some(envelope) => {
1228                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
1229                                    let pre_pipe = pre_pipeline.load();
1230                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1231                                        Ok(ex) => ex,
1232                                        Err(e) => {
1233                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1234                                            continue;
1235                                        }
1236                                    };
1237
1238                                    let ex = {
1239                                        let cloned_svc = agg
1240                                            .lock()
1241                                            .expect("mutex poisoned: another thread panicked while holding this lock")
1242                                            .clone();
1243                                        cloned_svc.oneshot(ex).await
1244                                    };
1245
1246                                    match ex {
1247                                        Ok(ex) => {
1248                                            if !is_pending(&ex) {
1249                                                let post_pipe = post_pipeline.load();
1250                                                let out = post_pipe.0.clone().oneshot(ex).await;
1251                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
1252                                            } else if let Some(tx) = reply_tx {
1253                                                let _ = tx.send(Ok(ex));
1254                                            }
1255                                        }
1256                                        Err(e) => {
1257                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1258                                        }
1259                                    }
1260                                }
1261                                None => return,
1262                            }
1263                        }
1264
1265                        _ = pipeline_cancel.cancelled() => {
1266                            {
1267                                let guard = agg
1268                                    .lock()
1269                                    .expect("mutex poisoned: another thread panicked while holding this lock");
1270                                guard.force_complete_all();
1271                            }
1272                            let mut rx_guard = late_rx.lock().await;
1273                            while let Ok(late_ex) = rx_guard.try_recv() {
1274                                let pipe = post_pipeline.load();
1275                                let _ = pipe.0.clone().oneshot(late_ex).await;
1276                            }
1277                            break;
1278                        }
1279                    }
1280                }
1281            });
1282            #[cfg(test)]
1283            emit_start_route_event("pipeline_spawned");
1284
1285            // Start consumer after pipeline loop is spawned to avoid startup races
1286            // where consumers emit exchanges before the route pipeline begins polling.
1287            let consumer_handle = super::consumer_management::spawn_consumer_task(
1288                route_id.to_string(),
1289                consumer,
1290                consumer_ctx,
1291                crash_notifier,
1292                runtime_for_consumer,
1293                false,
1294            );
1295            #[cfg(test)]
1296            emit_start_route_event("consumer_spawned");
1297
1298            let managed = self
1299                .routes
1300                .get_mut(route_id)
1301                .expect("invariant: route must exist");
1302            managed.consumer_handle = Some(consumer_handle);
1303            managed.pipeline_handle = Some(pipeline_handle);
1304            managed.channel_sender = Some(tx_for_storage);
1305
1306            info!(route_id = %route_id, "Route started (aggregate with timeout)");
1307            return Ok(());
1308        }
1309        // --- End aggregator v2 branch ---
1310
1311        // Spawn pipeline task with its own cancellation token
1312        let pipeline_handle = match effective_concurrency {
1313            ConcurrencyModel::Sequential => {
1314                tokio::spawn(async move {
1315                    loop {
1316                        // Use select! to exit promptly on cancellation even when idle
1317                        let envelope = tokio::select! {
1318                            envelope = rx.recv() => match envelope {
1319                                Some(e) => e,
1320                                None => return, // Channel closed
1321                            },
1322                            _ = pipeline_cancel.cancelled() => {
1323                                // Cancellation requested - exit gracefully
1324                                return;
1325                            }
1326                        };
1327                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1328
1329                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1330                        let mut pipeline = pipeline.load().0.clone();
1331
1332                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1333                            if let Some(tx) = reply_tx {
1334                                let _ = tx.send(Err(e));
1335                            }
1336                            return;
1337                        }
1338
1339                        let result = pipeline.call(exchange).await;
1340                        if let Some(tx) = reply_tx {
1341                            let _ = tx.send(result);
1342                        } else if let Err(ref e) = result
1343                            && !matches!(e, CamelError::Stopped)
1344                        {
1345                            error!("Pipeline error: {e}");
1346                        }
1347                    }
1348                })
1349            }
1350            ConcurrencyModel::Concurrent { max } => {
1351                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1352                tokio::spawn(async move {
1353                    loop {
1354                        // Use select! to exit promptly on cancellation even when idle
1355                        let envelope = tokio::select! {
1356                            envelope = rx.recv() => match envelope {
1357                                Some(e) => e,
1358                                None => return, // Channel closed
1359                            },
1360                            _ = pipeline_cancel.cancelled() => {
1361                                // Cancellation requested - exit gracefully
1362                                return;
1363                            }
1364                        };
1365                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1366                        let pipe_ref = Arc::clone(&pipeline);
1367                        let sem = sem.clone();
1368                        let cancel = pipeline_cancel.clone();
1369                        tokio::spawn(async move {
1370                            // Acquire semaphore permit if bounded
1371                            let _permit = match &sem {
1372                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1373                                None => None,
1374                            };
1375
1376                            // Load current pipeline from ArcSwap
1377                            let mut pipe = pipe_ref.load().0.clone();
1378
1379                            // Wait for service ready with circuit breaker backoff
1380                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1381                                if let Some(tx) = reply_tx {
1382                                    let _ = tx.send(Err(e));
1383                                }
1384                                return;
1385                            }
1386
1387                            let result = pipe.call(exchange).await;
1388                            if let Some(tx) = reply_tx {
1389                                let _ = tx.send(result);
1390                            } else if let Err(ref e) = result
1391                                && !matches!(e, CamelError::Stopped)
1392                            {
1393                                error!("Pipeline error: {e}");
1394                            }
1395                        });
1396                    }
1397                })
1398            }
1399        };
1400        #[cfg(test)]
1401        emit_start_route_event("pipeline_spawned");
1402
1403        // Start consumer after pipeline task is spawned to minimize the chance of
1404        // fire-and-forget events being produced before the pipeline loop is active.
1405        let consumer_handle = super::consumer_management::spawn_consumer_task(
1406            route_id.to_string(),
1407            consumer,
1408            consumer_ctx,
1409            crash_notifier,
1410            runtime_for_consumer,
1411            false,
1412        );
1413        #[cfg(test)]
1414        emit_start_route_event("consumer_spawned");
1415
1416        // Store handles and update status
1417        let managed = self
1418            .routes
1419            .get_mut(route_id)
1420            .expect("invariant: route must exist after prior existence check");
1421        managed.consumer_handle = Some(consumer_handle);
1422        managed.pipeline_handle = Some(pipeline_handle);
1423        managed.channel_sender = Some(tx_for_storage);
1424
1425        info!(route_id = %route_id, "Route started");
1426        Ok(())
1427    }
1428
1429    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1430        self.stop_route_internal(route_id).await
1431    }
1432
1433    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1434        self.stop_route(route_id).await?;
1435        tokio::time::sleep(Duration::from_millis(100)).await;
1436        self.start_route(route_id).await
1437    }
1438
1439    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1440        // Check route exists and state.
1441        let managed = self
1442            .routes
1443            .get_mut(route_id)
1444            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1445
1446        let consumer_running = handle_is_running(&managed.consumer_handle);
1447        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1448
1449        // Can only suspend from active started state.
1450        if !consumer_running || !pipeline_running {
1451            return Err(CamelError::RouteError(format!(
1452                "Cannot suspend route '{}' with execution lifecycle {}",
1453                route_id,
1454                inferred_lifecycle_label(managed)
1455            )));
1456        }
1457
1458        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1459
1460        // Cancel consumer token only (keep pipeline running)
1461        let managed = self
1462            .routes
1463            .get_mut(route_id)
1464            .expect("invariant: route must exist after prior existence check");
1465        managed.consumer_cancel_token.cancel();
1466
1467        // Take and join consumer handle
1468        let managed = self
1469            .routes
1470            .get_mut(route_id)
1471            .expect("invariant: route must exist after prior existence check");
1472        let consumer_handle = managed.consumer_handle.take();
1473
1474        // Wait for consumer task to complete with timeout
1475        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1476            if let Some(handle) = consumer_handle {
1477                let _ = handle.await;
1478            }
1479        })
1480        .await;
1481
1482        if timeout_result.is_err() {
1483            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1484        }
1485
1486        // Get the managed route again (can't hold across await)
1487        let managed = self
1488            .routes
1489            .get_mut(route_id)
1490            .expect("invariant: route must exist after prior existence check");
1491
1492        // Create fresh cancellation token for consumer (for resume)
1493        managed.consumer_cancel_token = CancellationToken::new();
1494
1495        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1496        Ok(())
1497    }
1498
1499    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1500        // Check route exists and is Suspended-equivalent execution state.
1501        let managed = self
1502            .routes
1503            .get(route_id)
1504            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1505
1506        let consumer_running = handle_is_running(&managed.consumer_handle);
1507        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1508        if consumer_running || !pipeline_running {
1509            return Err(CamelError::RouteError(format!(
1510                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1511                route_id,
1512                inferred_lifecycle_label(managed)
1513            )));
1514        }
1515
1516        // Get the stored channel sender (must exist for a suspended route)
1517        let sender = managed.channel_sender.clone().ok_or_else(|| {
1518            CamelError::RouteError("Suspended route has no channel sender".into())
1519        })?;
1520
1521        // Get from_uri and concurrency for creating new consumer
1522        let from_uri = managed.from_uri.clone();
1523
1524        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1525
1526        let (consumer, _) = super::consumer_management::create_route_consumer(
1527            &self.registry,
1528            &from_uri,
1529            &ControllerComponentContext::new(
1530                Arc::clone(&self.registry),
1531                Arc::clone(&self.languages),
1532                self.tracer_metrics
1533                    .clone()
1534                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1535                Arc::clone(&self.platform_service),
1536            ),
1537        )?;
1538
1539        // Get the managed route for mutation
1540        let managed = self
1541            .routes
1542            .get_mut(route_id)
1543            .expect("invariant: route must exist after prior existence check");
1544
1545        // Create child token for consumer lifecycle
1546        let consumer_cancel = managed.consumer_cancel_token.child_token();
1547
1548        let crash_notifier = self.crash_notifier.clone();
1549        let runtime_for_consumer = self.runtime.clone();
1550
1551        // Create ConsumerContext with the stored sender
1552        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1553
1554        // Spawn consumer task
1555        let consumer_handle = super::consumer_management::spawn_consumer_task(
1556            route_id.to_string(),
1557            consumer,
1558            consumer_ctx,
1559            crash_notifier,
1560            runtime_for_consumer,
1561            true,
1562        );
1563
1564        // Store consumer handle and update status
1565        let managed = self
1566            .routes
1567            .get_mut(route_id)
1568            .expect("invariant: route must exist after prior existence check");
1569        managed.consumer_handle = Some(consumer_handle);
1570
1571        info!(route_id = %route_id, "Route resumed");
1572        Ok(())
1573    }
1574
1575    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1576        // Only start routes where auto_startup() == true
1577        // Sort by startup_order() ascending before starting
1578        let route_ids: Vec<String> = {
1579            let mut pairs: Vec<_> = self
1580                .routes
1581                .iter()
1582                .filter(|(_, r)| r.definition.auto_startup())
1583                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1584                .collect();
1585            pairs.sort_by_key(|(_, order)| *order);
1586            pairs.into_iter().map(|(id, _)| id).collect()
1587        };
1588
1589        info!("Starting {} auto-startup routes", route_ids.len());
1590
1591        // Collect errors but continue starting remaining routes
1592        let mut errors: Vec<String> = Vec::new();
1593        for route_id in route_ids {
1594            if let Err(e) = self.start_route(&route_id).await {
1595                errors.push(format!("Route '{}': {}", route_id, e));
1596            }
1597        }
1598
1599        if !errors.is_empty() {
1600            return Err(CamelError::RouteError(format!(
1601                "Failed to start routes: {}",
1602                errors.join(", ")
1603            )));
1604        }
1605
1606        info!("All auto-startup routes started");
1607        Ok(())
1608    }
1609
1610    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1611        // Sort by startup_order descending (reverse order)
1612        let route_ids: Vec<String> = {
1613            let mut pairs: Vec<_> = self
1614                .routes
1615                .iter()
1616                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1617                .collect();
1618            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1619            pairs.into_iter().map(|(id, _)| id).collect()
1620        };
1621
1622        info!("Stopping {} routes", route_ids.len());
1623
1624        for route_id in route_ids {
1625            let _ = self.stop_route(&route_id).await;
1626        }
1627
1628        info!("All routes stopped");
1629        Ok(())
1630    }
1631}
1632
1633#[cfg(test)]
1634#[path = "route_controller_tests.rs"]
1635mod tests;