Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21    BoxProcessor, CamelError, Exchange, IdentityProcessor, NoOpMetrics, NoopPlatformService,
22    PlatformService, ProducerContext, RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25    ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35    compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38    BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45/// Notification sent when a route crashes.
46#[derive(Debug, Clone)]
47pub struct CrashNotification {
48    /// The ID of the crashed route.
49    pub route_id: String,
50    /// The error that caused the crash.
51    pub error: String,
52}
53
54/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
55///
56/// # Safety
57///
58/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
59/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
60///
61/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
62///    (creates a new boxed service from the inner clone).
63/// 2. The clone is owned by the calling thread and never shared.
64/// 3. ArcSwap guarantees we only get & references (no &mut).
65///
66/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
67/// clone() does not mutate shared state and each thread gets an independent copy.
68pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73#[cfg(test)]
74type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
75
76#[cfg(test)]
77static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
78    std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
79
80#[cfg(test)]
81fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
82    *START_ROUTE_EVENT_HOOK
83        .lock()
84        .expect("start route event hook lock") = hook;
85}
86
87#[cfg(test)]
88fn emit_start_route_event(event: &'static str) {
89    if let Some(hook) = START_ROUTE_EVENT_HOOK
90        .lock()
91        .expect("start route event hook lock")
92        .as_ref()
93    {
94        hook(event);
95    }
96}
97
98/// Internal state for a managed route.
99pub(super) struct AggregateSplitInfo {
100    pub(super) pre_pipeline: SharedPipeline,
101    pub(super) agg_config: AggregatorConfig,
102    pub(super) post_pipeline: SharedPipeline,
103}
104
105pub(super) struct ManagedRoute {
106    /// The route definition metadata (for introspection).
107    pub(super) definition: RouteDefinitionInfo,
108    /// Source endpoint URI.
109    pub(super) from_uri: String,
110    /// Resolved processor pipeline (wrapped for atomic swap).
111    pub(super) pipeline: SharedPipeline,
112    /// Concurrency model override (if any).
113    pub(super) concurrency: Option<ConcurrencyModel>,
114    /// Handle for the consumer task (if running).
115    pub(super) consumer_handle: Option<JoinHandle<()>>,
116    /// Handle for the pipeline task (if running).
117    pub(super) pipeline_handle: Option<JoinHandle<()>>,
118    /// Cancellation token for stopping the consumer task.
119    /// This allows independent control of the consumer lifecycle (for suspend/resume).
120    pub(super) consumer_cancel_token: CancellationToken,
121    /// Cancellation token for stopping the pipeline task.
122    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
123    pub(super) pipeline_cancel_token: CancellationToken,
124    /// Channel sender for sending exchanges to the pipeline.
125    /// Stored to allow resuming a suspended route without recreating the channel.
126    pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
127    /// In-flight exchange counter. `None` when UoW is not configured for this route.
128    pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
129    pub(super) aggregate_split: Option<AggregateSplitInfo>,
130    pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
131}
132
133pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
134    handle.as_ref().is_some_and(|h| !h.is_finished())
135}
136
137fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
138    match (
139        handle_is_running(&managed.consumer_handle),
140        handle_is_running(&managed.pipeline_handle),
141    ) {
142        (true, true) => "Started",
143        (false, true) => "Suspended",
144        (true, false) => "Stopping",
145        (false, false) => "Stopped",
146    }
147}
148
149fn find_top_level_aggregate_with_timeout(
150    steps: &[BuilderStep],
151) -> Option<(usize, AggregatorConfig)> {
152    for (i, step) in steps.iter().enumerate() {
153        if let BuilderStep::Aggregate { config } = step {
154            if has_timeout_condition(&config.completion) {
155                return Some((i, config.clone()));
156            }
157            break;
158        }
159    }
160    None
161}
162
163pub(crate) struct ControllerComponentContext {
164    registry: Arc<std::sync::Mutex<Registry>>,
165    languages: SharedLanguageRegistry,
166    metrics: Arc<dyn MetricsCollector>,
167    platform_service: Arc<dyn PlatformService>,
168}
169
170impl ControllerComponentContext {
171    pub(crate) fn new(
172        registry: Arc<std::sync::Mutex<Registry>>,
173        languages: SharedLanguageRegistry,
174        metrics: Arc<dyn MetricsCollector>,
175        platform_service: Arc<dyn PlatformService>,
176    ) -> Self {
177        Self {
178            registry,
179            languages,
180            metrics,
181            platform_service,
182        }
183    }
184}
185
186impl ComponentContext for ControllerComponentContext {
187    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
188        self.registry.lock().ok()?.get(scheme)
189    }
190
191    fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
192        self.languages.lock().ok()?.get(name).cloned()
193    }
194
195    fn metrics(&self) -> Arc<dyn MetricsCollector> {
196        Arc::clone(&self.metrics)
197    }
198
199    fn platform_service(&self) -> Arc<dyn PlatformService> {
200        Arc::clone(&self.platform_service)
201    }
202}
203
204fn is_pending(ex: &Exchange) -> bool {
205    ex.property("CamelAggregatorPending")
206        .and_then(|v| v.as_bool())
207        .unwrap_or(false)
208}
209
210/// Wait for a pipeline service to be ready with circuit breaker backoff.
211///
212/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
213/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
214/// cancellation checks. It returns `Ok(())` when the service is ready, or
215/// `Err(e)` if cancellation occurred or a fatal error was encountered.
216async fn ready_with_backoff(
217    pipeline: &mut BoxProcessor,
218    cancel: &CancellationToken,
219) -> Result<(), CamelError> {
220    loop {
221        match pipeline.ready().await {
222            Ok(_) => return Ok(()),
223            Err(CamelError::CircuitOpen(ref msg)) => {
224                warn!("Circuit open, backing off: {msg}");
225                tokio::select! {
226                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
227                        continue;
228                    }
229                    _ = cancel.cancelled() => {
230                        // Shutting down — don't retry.
231                        return Err(CamelError::CircuitOpen(msg.clone()));
232                    }
233                }
234            }
235            Err(e) => {
236                error!("Pipeline not ready: {e}");
237                return Err(e);
238            }
239        }
240    }
241}
242
243fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
244    let stamp = std::time::SystemTime::now()
245        .duration_since(std::time::UNIX_EPOCH)
246        .unwrap_or_default()
247        .as_nanos();
248    RuntimeCommand::FailRoute {
249        route_id: route_id.to_string(),
250        error: error.to_string(),
251        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
252        causation_id: None,
253    }
254}
255
256pub(super) async fn publish_runtime_failure(
257    runtime: Option<Weak<dyn RuntimeHandle>>,
258    route_id: &str,
259    error: &str,
260) {
261    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
262        return;
263    };
264    let command = runtime_failure_command(route_id, error);
265    if let Err(runtime_error) = runtime.execute(command).await {
266        warn!(
267            route_id = %route_id,
268            error = %runtime_error,
269            "failed to synchronize route crash with runtime projection"
270        );
271    }
272}
273
274/// Default implementation of [`RouteController`].
275///
276/// Manages route lifecycle with support for:
277/// - Starting/stopping individual routes
278/// - Suspending and resuming routes
279/// - Auto-startup with startup ordering
280/// - Graceful shutdown
281pub struct DefaultRouteController {
282    /// Routes indexed by route ID.
283    routes: HashMap<String, ManagedRoute>,
284    /// Reference to the component registry for resolving endpoints.
285    registry: Arc<std::sync::Mutex<Registry>>,
286    /// Shared language registry for resolving declarative language expressions.
287    languages: SharedLanguageRegistry,
288    /// Bean registry for bean method invocation.
289    beans: Arc<std::sync::Mutex<BeanRegistry>>,
290    /// Runtime handle injected into ProducerContext for command/query operations.
291    runtime: Option<Weak<dyn RuntimeHandle>>,
292    /// Optional global error handler applied to all routes without a per-route handler.
293    global_error_handler: Option<ErrorHandlerConfig>,
294    /// Optional crash notifier for supervision.
295    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
296    /// Whether tracing is enabled for route pipelines.
297    tracing_enabled: bool,
298    /// Detail level for tracing when enabled.
299    tracer_detail_level: DetailLevel,
300    /// Metrics collector for tracing processor.
301    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
302    platform_service: Arc<dyn PlatformService>,
303}
304
305impl DefaultRouteController {
306    /// Create a new `DefaultRouteController` with the given registry.
307    pub fn new(
308        registry: Arc<std::sync::Mutex<Registry>>,
309        platform_service: Arc<dyn PlatformService>,
310    ) -> Self {
311        Self::with_beans_and_platform_service(
312            registry,
313            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
314            platform_service,
315        )
316    }
317
318    /// Create a new `DefaultRouteController` with shared bean registry.
319    pub fn with_beans(
320        registry: Arc<std::sync::Mutex<Registry>>,
321        beans: Arc<std::sync::Mutex<BeanRegistry>>,
322    ) -> Self {
323        Self::with_beans_and_platform_service(
324            registry,
325            beans,
326            Arc::new(NoopPlatformService::default()),
327        )
328    }
329
330    fn with_beans_and_platform_service(
331        registry: Arc<std::sync::Mutex<Registry>>,
332        beans: Arc<std::sync::Mutex<BeanRegistry>>,
333        platform_service: Arc<dyn PlatformService>,
334    ) -> Self {
335        Self {
336            routes: HashMap::new(),
337            registry,
338            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
339            beans,
340            runtime: None,
341            global_error_handler: None,
342            crash_notifier: None,
343            tracing_enabled: false,
344            tracer_detail_level: DetailLevel::Minimal,
345            tracer_metrics: None,
346            platform_service,
347        }
348    }
349
350    /// Create a new `DefaultRouteController` with shared language registry.
351    pub fn with_languages(
352        registry: Arc<std::sync::Mutex<Registry>>,
353        languages: SharedLanguageRegistry,
354        platform_service: Arc<dyn PlatformService>,
355    ) -> Self {
356        Self {
357            routes: HashMap::new(),
358            registry,
359            languages,
360            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
361            runtime: None,
362            global_error_handler: None,
363            crash_notifier: None,
364            tracing_enabled: false,
365            tracer_detail_level: DetailLevel::Minimal,
366            tracer_metrics: None,
367            platform_service,
368        }
369    }
370
371    pub fn with_languages_and_beans(
372        registry: Arc<std::sync::Mutex<Registry>>,
373        languages: SharedLanguageRegistry,
374        platform_service: Arc<dyn PlatformService>,
375        beans: Arc<std::sync::Mutex<BeanRegistry>>,
376    ) -> Self {
377        Self {
378            routes: HashMap::new(),
379            registry,
380            languages,
381            beans,
382            runtime: None,
383            global_error_handler: None,
384            crash_notifier: None,
385            tracing_enabled: false,
386            tracer_detail_level: DetailLevel::Minimal,
387            tracer_metrics: None,
388            platform_service,
389        }
390    }
391
392    /// Set runtime handle for ProducerContext creation.
393    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
394        self.runtime = Some(Arc::downgrade(&runtime));
395    }
396
397    /// Set the crash notifier for supervision.
398    ///
399    /// When set, the controller will send a [`CrashNotification`] whenever
400    /// a consumer crashes.
401    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
402        self.crash_notifier = Some(tx);
403    }
404
405    /// Set a global error handler applied to all routes without a per-route handler.
406    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
407        self.global_error_handler = Some(config);
408    }
409
410    /// Configure tracing for this route controller.
411    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
412        self.tracing_enabled = config.enabled;
413        self.tracer_detail_level = config.detail_level.clone();
414        self.tracer_metrics = config.metrics_collector.clone();
415    }
416
417    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
418        let mut producer_ctx = ProducerContext::new();
419        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
420            producer_ctx = producer_ctx.with_runtime(runtime);
421        }
422        Ok(producer_ctx)
423    }
424
425    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
426    fn resolve_error_handler(
427        &self,
428        config: ErrorHandlerConfig,
429        producer_ctx: &ProducerContext,
430        component_ctx: &dyn ComponentContext,
431    ) -> Result<ErrorHandlerLayer, CamelError> {
432        // Resolve DLC URI → producer.
433        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
434            let parsed = parse_uri(uri)?;
435            let component = component_ctx
436                .resolve_component(&parsed.scheme)
437                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
438            let endpoint = component.create_endpoint(uri, component_ctx)?;
439            Some(endpoint.create_producer(producer_ctx)?)
440        } else {
441            None
442        };
443
444        // Resolve per-policy `handled_by` URIs.
445        let mut resolved_policies = Vec::new();
446        for policy in config.policies {
447            let handler_producer = if let Some(ref uri) = policy.handled_by {
448                let parsed = parse_uri(uri)?;
449                let component = component_ctx
450                    .resolve_component(&parsed.scheme)
451                    .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
452                let endpoint = component.create_endpoint(uri, component_ctx)?;
453                Some(endpoint.create_producer(producer_ctx)?)
454            } else {
455                None
456            };
457            resolved_policies.push((policy, handler_producer));
458        }
459
460        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
461    }
462
463    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
464    /// Returns `Err` if any hook URI cannot be resolved.
465    fn resolve_uow_layer(
466        &self,
467        config: &UnitOfWorkConfig,
468        producer_ctx: &ProducerContext,
469        component_ctx: &dyn ComponentContext,
470        counter: Option<Arc<AtomicU64>>,
471    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
472        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
473            let parsed = parse_uri(uri)?;
474            let component = component_ctx
475                .resolve_component(&parsed.scheme)
476                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
477            let endpoint = component.create_endpoint(uri, component_ctx)?;
478            endpoint.create_producer(producer_ctx).map_err(|e| {
479                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
480            })
481        };
482
483        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
484        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
485
486        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
487        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
488        Ok((layer, counter))
489    }
490
491    /// Resolve BuilderSteps into BoxProcessors.
492    pub(crate) fn resolve_steps(
493        &self,
494        steps: Vec<BuilderStep>,
495        producer_ctx: &ProducerContext,
496        registry: &Arc<std::sync::Mutex<Registry>>,
497    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
498        let component_ctx = Arc::new(ControllerComponentContext::new(
499            Arc::clone(registry),
500            Arc::clone(&self.languages),
501            self.tracer_metrics
502                .clone()
503                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
504            Arc::clone(&self.platform_service),
505        ));
506
507        super::step_resolution::resolve_steps(
508            steps,
509            producer_ctx,
510            registry,
511            &self.languages,
512            &self.beans,
513            component_ctx,
514        )
515    }
516
517    /// Add a route definition to the controller.
518    ///
519    /// Steps are resolved immediately using the registry.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if:
524    /// - A route with the same ID already exists
525    /// - Step resolution fails
526    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
527        let route_id = definition.route_id().to_string();
528
529        if self.routes.contains_key(&route_id) {
530            return Err(CamelError::RouteError(format!(
531                "Route '{}' already exists",
532                route_id
533            )));
534        }
535
536        info!(route_id = %route_id, "Adding route to controller");
537
538        // Extract definition info for storage before steps are consumed
539        let definition_info = definition.to_info();
540        let RouteDefinition {
541            from_uri,
542            steps,
543            error_handler,
544            circuit_breaker,
545            unit_of_work,
546            concurrency,
547            ..
548        } = definition;
549
550        // Create ProducerContext from self_ref for step resolution
551        let producer_ctx = self.build_producer_context()?;
552
553        // Take ownership of steps before resolve_steps consumes them
554        let mut aggregate_split: Option<AggregateSplitInfo> = None;
555        let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
556            Some((idx, agg_config)) => {
557                let mut pre_steps = steps;
558                let mut rest = pre_steps.split_off(idx);
559                let _agg_step = rest.remove(0);
560                let post_steps = rest;
561
562                let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
563                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
564                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
565                    compose_pipeline(pre_procs),
566                )));
567
568                let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
569                let post_procs: Vec<BoxProcessor> =
570                    post_pairs.into_iter().map(|(p, _)| p).collect();
571                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
572                    compose_pipeline(post_procs),
573                )));
574
575                aggregate_split = Some(AggregateSplitInfo {
576                    pre_pipeline,
577                    agg_config,
578                    post_pipeline,
579                });
580
581                vec![]
582            }
583            None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
584        };
585        let route_id_for_tracing = route_id.clone();
586        let mut pipeline = if processors_with_contracts.is_empty() {
587            BoxProcessor::new(IdentityProcessor)
588        } else {
589            compose_traced_pipeline_with_contracts(
590                processors_with_contracts,
591                &route_id_for_tracing,
592                self.tracing_enabled,
593                self.tracer_detail_level.clone(),
594                self.tracer_metrics.clone(),
595            )
596        };
597
598        // Apply circuit breaker if configured
599        if let Some(cb_config) = circuit_breaker {
600            let cb_layer = CircuitBreakerLayer::new(cb_config);
601            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
602        }
603
604        // Determine which error handler config to use (per-route takes precedence)
605        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
606
607        if let Some(config) = eh_config {
608            let component_ctx = ControllerComponentContext::new(
609                Arc::clone(&self.registry),
610                Arc::clone(&self.languages),
611                self.tracer_metrics
612                    .clone()
613                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
614                Arc::clone(&self.platform_service),
615            );
616            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
617            pipeline = BoxProcessor::new(layer.layer(pipeline));
618        }
619
620        // Apply UoW layer outermost (after error handler)
621        let uow_counter = if let Some(uow_config) = &unit_of_work {
622            let component_ctx = ControllerComponentContext::new(
623                Arc::clone(&self.registry),
624                Arc::clone(&self.languages),
625                self.tracer_metrics
626                    .clone()
627                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
628                Arc::clone(&self.platform_service),
629            );
630            let (uow_layer, counter) =
631                self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
632            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
633            Some(counter)
634        } else {
635            None
636        };
637
638        self.routes.insert(
639            route_id.clone(),
640            ManagedRoute {
641                definition: definition_info,
642                from_uri,
643                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
644                concurrency,
645                consumer_handle: None,
646                pipeline_handle: None,
647                consumer_cancel_token: CancellationToken::new(),
648                pipeline_cancel_token: CancellationToken::new(),
649                channel_sender: None,
650                in_flight: uow_counter,
651                aggregate_split,
652                agg_service: None,
653            },
654        );
655
656        Ok(())
657    }
658
659    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
660    ///
661    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
662    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
663    pub fn compile_route_definition(
664        &self,
665        def: RouteDefinition,
666    ) -> Result<BoxProcessor, CamelError> {
667        let route_id = def.route_id().to_string();
668
669        let producer_ctx = self.build_producer_context()?;
670
671        let processors_with_contracts =
672            self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
673        let mut pipeline = compose_traced_pipeline_with_contracts(
674            processors_with_contracts,
675            &route_id,
676            self.tracing_enabled,
677            self.tracer_detail_level.clone(),
678            self.tracer_metrics.clone(),
679        );
680
681        if let Some(cb_config) = def.circuit_breaker {
682            let cb_layer = CircuitBreakerLayer::new(cb_config);
683            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
684        }
685
686        let eh_config = def
687            .error_handler
688            .clone()
689            .or_else(|| self.global_error_handler.clone());
690        if let Some(config) = eh_config {
691            let component_ctx = ControllerComponentContext::new(
692                Arc::clone(&self.registry),
693                Arc::clone(&self.languages),
694                self.tracer_metrics
695                    .clone()
696                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
697                Arc::clone(&self.platform_service),
698            );
699            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
700            pipeline = BoxProcessor::new(layer.layer(pipeline));
701        }
702
703        // Apply UoW layer outermost
704        if let Some(uow_config) = &def.unit_of_work {
705            let existing_counter = self
706                .routes
707                .get(&route_id)
708                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
709
710            let component_ctx = ControllerComponentContext::new(
711                Arc::clone(&self.registry),
712                Arc::clone(&self.languages),
713                self.tracer_metrics
714                    .clone()
715                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
716                Arc::clone(&self.platform_service),
717            );
718
719            let (uow_layer, _counter) = self.resolve_uow_layer(
720                uow_config,
721                &producer_ctx,
722                &component_ctx,
723                existing_counter,
724            )?;
725
726            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
727        }
728
729        Ok(pipeline)
730    }
731
732    /// Remove a route from the controller map.
733    ///
734    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
735    /// Returns an error if the route is still running or does not exist.
736    /// Does not cancel any running tasks — call `stop_route` first.
737    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
738        let managed = self.routes.get(route_id).ok_or_else(|| {
739            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
740        })?;
741        if handle_is_running(&managed.consumer_handle)
742            || handle_is_running(&managed.pipeline_handle)
743        {
744            return Err(CamelError::RouteError(format!(
745                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
746                route_id,
747                inferred_lifecycle_label(managed)
748            )));
749        }
750        self.routes.remove(route_id);
751        info!(route_id = %route_id, "Route removed from controller");
752        Ok(())
753    }
754
755    /// Returns the number of routes in the controller.
756    pub fn route_count(&self) -> usize {
757        self.routes.len()
758    }
759
760    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
761        self.routes.get(route_id).map(|r| {
762            r.in_flight
763                .as_ref()
764                .map_or(0, |c| c.load(Ordering::Relaxed))
765        })
766    }
767
768    /// Returns `true` if a route with the given ID exists.
769    pub fn route_exists(&self, route_id: &str) -> bool {
770        self.routes.contains_key(route_id)
771    }
772
773    /// Returns all route IDs.
774    pub fn route_ids(&self) -> Vec<String> {
775        self.routes.keys().cloned().collect()
776    }
777
778    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
779        self.routes
780            .get(route_id)
781            .and_then(|m| m.definition.source_hash())
782    }
783
784    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
785    pub fn auto_startup_route_ids(&self) -> Vec<String> {
786        let mut pairs: Vec<(String, i32)> = self
787            .routes
788            .iter()
789            .filter(|(_, managed)| managed.definition.auto_startup())
790            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
791            .collect();
792        pairs.sort_by_key(|(_, order)| *order);
793        pairs.into_iter().map(|(id, _)| id).collect()
794    }
795
796    /// Returns route IDs sorted by shutdown order (startup order descending).
797    pub fn shutdown_route_ids(&self) -> Vec<String> {
798        let mut pairs: Vec<(String, i32)> = self
799            .routes
800            .iter()
801            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
802            .collect();
803        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
804        pairs.into_iter().map(|(id, _)| id).collect()
805    }
806
807    /// Atomically swap the pipeline of a route.
808    ///
809    /// In-flight requests finish with the old pipeline (kept alive by Arc).
810    /// New requests immediately use the new pipeline.
811    pub fn swap_pipeline(
812        &self,
813        route_id: &str,
814        new_pipeline: BoxProcessor,
815    ) -> Result<(), CamelError> {
816        let managed = self
817            .routes
818            .get(route_id)
819            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
820
821        if managed.aggregate_split.is_some() {
822            tracing::warn!(
823                route_id = %route_id,
824                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
825            );
826        }
827
828        managed
829            .pipeline
830            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
831        info!(route_id = %route_id, "Pipeline swapped atomically");
832        Ok(())
833    }
834
835    /// Returns the from_uri of a route, if it exists.
836    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
837        self.routes.get(route_id).map(|r| r.from_uri.clone())
838    }
839
840    /// Get a clone of the current pipeline for a route.
841    ///
842    /// This is useful for testing and introspection.
843    /// Returns `None` if the route doesn't exist.
844    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
845        self.routes
846            .get(route_id)
847            .map(|r| r.pipeline.load().0.clone())
848    }
849
850    /// Internal stop implementation that can set custom status.
851    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
852        super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
853    }
854
855    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
856        self.start_route(route_id).await
857    }
858
859    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
860        self.stop_route(route_id).await
861    }
862}
863
864#[async_trait::async_trait]
865impl RouteController for DefaultRouteController {
866    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
867        // Check if route exists and can be started.
868        {
869            let managed = self
870                .routes
871                .get_mut(route_id)
872                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
873
874            let consumer_running = handle_is_running(&managed.consumer_handle);
875            let pipeline_running = handle_is_running(&managed.pipeline_handle);
876            if consumer_running && pipeline_running {
877                return Ok(());
878            }
879            if !consumer_running && pipeline_running {
880                return Err(CamelError::RouteError(format!(
881                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
882                    route_id
883                )));
884            }
885            if consumer_running && !pipeline_running {
886                return Err(CamelError::RouteError(format!(
887                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
888                    route_id
889                )));
890            }
891        }
892
893        info!(route_id = %route_id, "Starting route");
894
895        // Get the resolved route info
896        let (from_uri, pipeline, concurrency) = {
897            let managed = self
898                .routes
899                .get(route_id)
900                .expect("invariant: route must exist after prior existence check");
901            (
902                managed.from_uri.clone(),
903                Arc::clone(&managed.pipeline),
904                managed.concurrency.clone(),
905            )
906        };
907
908        // Clone crash notifier for consumer task
909        let crash_notifier = self.crash_notifier.clone();
910        let runtime_for_consumer = self.runtime.clone();
911
912        let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
913            &self.registry,
914            &from_uri,
915            &ControllerComponentContext::new(
916                Arc::clone(&self.registry),
917                Arc::clone(&self.languages),
918                self.tracer_metrics
919                    .clone()
920                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
921                Arc::clone(&self.platform_service),
922            ),
923        )?;
924
925        // Resolve effective concurrency: route override > consumer default
926        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
927
928        // Get the managed route for mutation
929        let managed = self
930            .routes
931            .get_mut(route_id)
932            .expect("invariant: route must exist after prior existence check");
933
934        // Create channel for consumer to send exchanges
935        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
936        // Create child tokens for independent lifecycle control
937        let consumer_cancel = managed.consumer_cancel_token.child_token();
938        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
939        // Clone sender for storage (to reuse on resume)
940        let tx_for_storage = tx.clone();
941        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
942
943        // --- Aggregator v2: check for aggregate route with timeout ---
944        let managed = self
945            .routes
946            .get_mut(route_id)
947            .expect("invariant: route must exist after prior existence check");
948
949        if let Some(split) = managed.aggregate_split.as_ref() {
950            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
951
952            let route_cancel_clone = pipeline_cancel.clone();
953            let svc = AggregatorService::new(
954                split.agg_config.clone(),
955                late_tx,
956                Arc::clone(&self.languages),
957                route_cancel_clone,
958            );
959            let agg = Arc::new(std::sync::Mutex::new(svc));
960
961            managed.agg_service = Some(Arc::clone(&agg));
962
963            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
964            let pre_pipeline = Arc::clone(&split.pre_pipeline);
965            let post_pipeline = Arc::clone(&split.post_pipeline);
966
967            // Spawn biased select forward loop
968            let pipeline_handle = tokio::spawn(async move {
969                loop {
970                    tokio::select! {
971                        biased;
972
973                        late_ex = async {
974                            let mut rx = late_rx.lock().await;
975                            rx.recv().await
976                        } => {
977                            match late_ex {
978                                Some(ex) => {
979                                    let pipe = post_pipeline.load();
980                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
981                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
982                                    }
983                                }
984                                None => return,
985                            }
986                        }
987
988                        envelope_opt = rx.recv() => {
989                            match envelope_opt {
990                                Some(envelope) => {
991                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
992                                    let pre_pipe = pre_pipeline.load();
993                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
994                                        Ok(ex) => ex,
995                                        Err(e) => {
996                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
997                                            continue;
998                                        }
999                                    };
1000
1001                                    let ex = {
1002                                        let cloned_svc = agg
1003                                            .lock()
1004                                            .expect("mutex poisoned: another thread panicked while holding this lock")
1005                                            .clone();
1006                                        cloned_svc.oneshot(ex).await
1007                                    };
1008
1009                                    match ex {
1010                                        Ok(ex) => {
1011                                            if !is_pending(&ex) {
1012                                                let post_pipe = post_pipeline.load();
1013                                                let out = post_pipe.0.clone().oneshot(ex).await;
1014                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
1015                                            } else if let Some(tx) = reply_tx {
1016                                                let _ = tx.send(Ok(ex));
1017                                            }
1018                                        }
1019                                        Err(e) => {
1020                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1021                                        }
1022                                    }
1023                                }
1024                                None => return,
1025                            }
1026                        }
1027
1028                        _ = pipeline_cancel.cancelled() => {
1029                            {
1030                                let guard = agg
1031                                    .lock()
1032                                    .expect("mutex poisoned: another thread panicked while holding this lock");
1033                                guard.force_complete_all();
1034                            }
1035                            let mut rx_guard = late_rx.lock().await;
1036                            while let Ok(late_ex) = rx_guard.try_recv() {
1037                                let pipe = post_pipeline.load();
1038                                let _ = pipe.0.clone().oneshot(late_ex).await;
1039                            }
1040                            break;
1041                        }
1042                    }
1043                }
1044            });
1045            #[cfg(test)]
1046            emit_start_route_event("pipeline_spawned");
1047
1048            // Start consumer after pipeline loop is spawned to avoid startup races
1049            // where consumers emit exchanges before the route pipeline begins polling.
1050            let consumer_handle = super::consumer_management::spawn_consumer_task(
1051                route_id.to_string(),
1052                consumer,
1053                consumer_ctx,
1054                crash_notifier,
1055                runtime_for_consumer,
1056                false,
1057            );
1058            #[cfg(test)]
1059            emit_start_route_event("consumer_spawned");
1060
1061            let managed = self
1062                .routes
1063                .get_mut(route_id)
1064                .expect("invariant: route must exist");
1065            managed.consumer_handle = Some(consumer_handle);
1066            managed.pipeline_handle = Some(pipeline_handle);
1067            managed.channel_sender = Some(tx_for_storage);
1068
1069            info!(route_id = %route_id, "Route started (aggregate with timeout)");
1070            return Ok(());
1071        }
1072        // --- End aggregator v2 branch ---
1073
1074        // Spawn pipeline task with its own cancellation token
1075        let pipeline_handle = match effective_concurrency {
1076            ConcurrencyModel::Sequential => {
1077                tokio::spawn(async move {
1078                    loop {
1079                        // Use select! to exit promptly on cancellation even when idle
1080                        let envelope = tokio::select! {
1081                            envelope = rx.recv() => match envelope {
1082                                Some(e) => e,
1083                                None => return, // Channel closed
1084                            },
1085                            _ = pipeline_cancel.cancelled() => {
1086                                // Cancellation requested - exit gracefully
1087                                return;
1088                            }
1089                        };
1090                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1091
1092                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1093                        let mut pipeline = pipeline.load().0.clone();
1094
1095                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1096                            if let Some(tx) = reply_tx {
1097                                let _ = tx.send(Err(e));
1098                            }
1099                            return;
1100                        }
1101
1102                        let result = pipeline.call(exchange).await;
1103                        if let Some(tx) = reply_tx {
1104                            let _ = tx.send(result);
1105                        } else if let Err(ref e) = result
1106                            && !matches!(e, CamelError::Stopped)
1107                        {
1108                            error!("Pipeline error: {e}");
1109                        }
1110                    }
1111                })
1112            }
1113            ConcurrencyModel::Concurrent { max } => {
1114                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1115                tokio::spawn(async move {
1116                    loop {
1117                        // Use select! to exit promptly on cancellation even when idle
1118                        let envelope = tokio::select! {
1119                            envelope = rx.recv() => match envelope {
1120                                Some(e) => e,
1121                                None => return, // Channel closed
1122                            },
1123                            _ = pipeline_cancel.cancelled() => {
1124                                // Cancellation requested - exit gracefully
1125                                return;
1126                            }
1127                        };
1128                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1129                        let pipe_ref = Arc::clone(&pipeline);
1130                        let sem = sem.clone();
1131                        let cancel = pipeline_cancel.clone();
1132                        tokio::spawn(async move {
1133                            // Acquire semaphore permit if bounded
1134                            let _permit = match &sem {
1135                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1136                                None => None,
1137                            };
1138
1139                            // Load current pipeline from ArcSwap
1140                            let mut pipe = pipe_ref.load().0.clone();
1141
1142                            // Wait for service ready with circuit breaker backoff
1143                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1144                                if let Some(tx) = reply_tx {
1145                                    let _ = tx.send(Err(e));
1146                                }
1147                                return;
1148                            }
1149
1150                            let result = pipe.call(exchange).await;
1151                            if let Some(tx) = reply_tx {
1152                                let _ = tx.send(result);
1153                            } else if let Err(ref e) = result
1154                                && !matches!(e, CamelError::Stopped)
1155                            {
1156                                error!("Pipeline error: {e}");
1157                            }
1158                        });
1159                    }
1160                })
1161            }
1162        };
1163        #[cfg(test)]
1164        emit_start_route_event("pipeline_spawned");
1165
1166        // Start consumer after pipeline task is spawned to minimize the chance of
1167        // fire-and-forget events being produced before the pipeline loop is active.
1168        let consumer_handle = super::consumer_management::spawn_consumer_task(
1169            route_id.to_string(),
1170            consumer,
1171            consumer_ctx,
1172            crash_notifier,
1173            runtime_for_consumer,
1174            false,
1175        );
1176        #[cfg(test)]
1177        emit_start_route_event("consumer_spawned");
1178
1179        // Store handles and update status
1180        let managed = self
1181            .routes
1182            .get_mut(route_id)
1183            .expect("invariant: route must exist after prior existence check");
1184        managed.consumer_handle = Some(consumer_handle);
1185        managed.pipeline_handle = Some(pipeline_handle);
1186        managed.channel_sender = Some(tx_for_storage);
1187
1188        info!(route_id = %route_id, "Route started");
1189        Ok(())
1190    }
1191
1192    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1193        self.stop_route_internal(route_id).await
1194    }
1195
1196    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1197        self.stop_route(route_id).await?;
1198        tokio::time::sleep(Duration::from_millis(100)).await;
1199        self.start_route(route_id).await
1200    }
1201
1202    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1203        // Check route exists and state.
1204        let managed = self
1205            .routes
1206            .get_mut(route_id)
1207            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1208
1209        let consumer_running = handle_is_running(&managed.consumer_handle);
1210        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1211
1212        // Can only suspend from active started state.
1213        if !consumer_running || !pipeline_running {
1214            return Err(CamelError::RouteError(format!(
1215                "Cannot suspend route '{}' with execution lifecycle {}",
1216                route_id,
1217                inferred_lifecycle_label(managed)
1218            )));
1219        }
1220
1221        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1222
1223        // Cancel consumer token only (keep pipeline running)
1224        let managed = self
1225            .routes
1226            .get_mut(route_id)
1227            .expect("invariant: route must exist after prior existence check");
1228        managed.consumer_cancel_token.cancel();
1229
1230        // Take and join consumer handle
1231        let managed = self
1232            .routes
1233            .get_mut(route_id)
1234            .expect("invariant: route must exist after prior existence check");
1235        let consumer_handle = managed.consumer_handle.take();
1236
1237        // Wait for consumer task to complete with timeout
1238        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1239            if let Some(handle) = consumer_handle {
1240                let _ = handle.await;
1241            }
1242        })
1243        .await;
1244
1245        if timeout_result.is_err() {
1246            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1247        }
1248
1249        // Get the managed route again (can't hold across await)
1250        let managed = self
1251            .routes
1252            .get_mut(route_id)
1253            .expect("invariant: route must exist after prior existence check");
1254
1255        // Create fresh cancellation token for consumer (for resume)
1256        managed.consumer_cancel_token = CancellationToken::new();
1257
1258        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1259        Ok(())
1260    }
1261
1262    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1263        // Check route exists and is Suspended-equivalent execution state.
1264        let managed = self
1265            .routes
1266            .get(route_id)
1267            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1268
1269        let consumer_running = handle_is_running(&managed.consumer_handle);
1270        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1271        if consumer_running || !pipeline_running {
1272            return Err(CamelError::RouteError(format!(
1273                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1274                route_id,
1275                inferred_lifecycle_label(managed)
1276            )));
1277        }
1278
1279        // Get the stored channel sender (must exist for a suspended route)
1280        let sender = managed.channel_sender.clone().ok_or_else(|| {
1281            CamelError::RouteError("Suspended route has no channel sender".into())
1282        })?;
1283
1284        // Get from_uri and concurrency for creating new consumer
1285        let from_uri = managed.from_uri.clone();
1286
1287        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1288
1289        let (consumer, _) = super::consumer_management::create_route_consumer(
1290            &self.registry,
1291            &from_uri,
1292            &ControllerComponentContext::new(
1293                Arc::clone(&self.registry),
1294                Arc::clone(&self.languages),
1295                self.tracer_metrics
1296                    .clone()
1297                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1298                Arc::clone(&self.platform_service),
1299            ),
1300        )?;
1301
1302        // Get the managed route for mutation
1303        let managed = self
1304            .routes
1305            .get_mut(route_id)
1306            .expect("invariant: route must exist after prior existence check");
1307
1308        // Create child token for consumer lifecycle
1309        let consumer_cancel = managed.consumer_cancel_token.child_token();
1310
1311        let crash_notifier = self.crash_notifier.clone();
1312        let runtime_for_consumer = self.runtime.clone();
1313
1314        // Create ConsumerContext with the stored sender
1315        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1316
1317        // Spawn consumer task
1318        let consumer_handle = super::consumer_management::spawn_consumer_task(
1319            route_id.to_string(),
1320            consumer,
1321            consumer_ctx,
1322            crash_notifier,
1323            runtime_for_consumer,
1324            true,
1325        );
1326
1327        // Store consumer handle and update status
1328        let managed = self
1329            .routes
1330            .get_mut(route_id)
1331            .expect("invariant: route must exist after prior existence check");
1332        managed.consumer_handle = Some(consumer_handle);
1333
1334        info!(route_id = %route_id, "Route resumed");
1335        Ok(())
1336    }
1337
1338    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1339        // Only start routes where auto_startup() == true
1340        // Sort by startup_order() ascending before starting
1341        let route_ids: Vec<String> = {
1342            let mut pairs: Vec<_> = self
1343                .routes
1344                .iter()
1345                .filter(|(_, r)| r.definition.auto_startup())
1346                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1347                .collect();
1348            pairs.sort_by_key(|(_, order)| *order);
1349            pairs.into_iter().map(|(id, _)| id).collect()
1350        };
1351
1352        info!("Starting {} auto-startup routes", route_ids.len());
1353
1354        // Collect errors but continue starting remaining routes
1355        let mut errors: Vec<String> = Vec::new();
1356        for route_id in route_ids {
1357            if let Err(e) = self.start_route(&route_id).await {
1358                errors.push(format!("Route '{}': {}", route_id, e));
1359            }
1360        }
1361
1362        if !errors.is_empty() {
1363            return Err(CamelError::RouteError(format!(
1364                "Failed to start routes: {}",
1365                errors.join(", ")
1366            )));
1367        }
1368
1369        info!("All auto-startup routes started");
1370        Ok(())
1371    }
1372
1373    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1374        // Sort by startup_order descending (reverse order)
1375        let route_ids: Vec<String> = {
1376            let mut pairs: Vec<_> = self
1377                .routes
1378                .iter()
1379                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1380                .collect();
1381            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1382            pairs.into_iter().map(|(id, _)| id).collect()
1383        };
1384
1385        info!("Stopping {} routes", route_ids.len());
1386
1387        for route_id in route_ids {
1388            let _ = self.stop_route(&route_id).await;
1389        }
1390
1391        info!("All routes stopped");
1392        Ok(())
1393    }
1394}
1395
1396#[cfg(test)]
1397#[path = "route_controller_tests.rs"]
1398mod tests;