Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21    BoxProcessor, CamelError, Exchange, IdentityProcessor, NoOpMetrics, NoopPlatformService,
22    PlatformService, ProducerContext, RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25    ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35    compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38    BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45/// Notification sent when a route crashes.
46#[derive(Debug, Clone)]
47pub struct CrashNotification {
48    /// The ID of the crashed route.
49    pub route_id: String,
50    /// The error that caused the crash.
51    pub error: String,
52}
53
54/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
55///
56/// # Safety
57///
58/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
59/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
60///
61/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
62///    (creates a new boxed service from the inner clone).
63/// 2. The clone is owned by the calling thread and never shared.
64/// 3. ArcSwap guarantees we only get & references (no &mut).
65///
66/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
67/// clone() does not mutate shared state and each thread gets an independent copy.
68pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73#[cfg(test)]
74type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
75
76#[cfg(test)]
77static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
78    std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
79
80#[cfg(test)]
81fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
82    *START_ROUTE_EVENT_HOOK
83        .lock()
84        .expect("start route event hook lock") = hook;
85}
86
87#[cfg(test)]
88fn emit_start_route_event(event: &'static str) {
89    if let Some(hook) = START_ROUTE_EVENT_HOOK
90        .lock()
91        .expect("start route event hook lock")
92        .as_ref()
93    {
94        hook(event);
95    }
96}
97
98/// Internal state for a managed route.
99pub(super) struct AggregateSplitInfo {
100    pub(super) pre_pipeline: SharedPipeline,
101    pub(super) agg_config: AggregatorConfig,
102    pub(super) post_pipeline: SharedPipeline,
103}
104
105pub(super) struct ManagedRoute {
106    /// The route definition metadata (for introspection).
107    pub(super) definition: RouteDefinitionInfo,
108    /// Source endpoint URI.
109    pub(super) from_uri: String,
110    /// Resolved processor pipeline (wrapped for atomic swap).
111    pub(super) pipeline: SharedPipeline,
112    /// Concurrency model override (if any).
113    pub(super) concurrency: Option<ConcurrencyModel>,
114    /// Handle for the consumer task (if running).
115    pub(super) consumer_handle: Option<JoinHandle<()>>,
116    /// Handle for the pipeline task (if running).
117    pub(super) pipeline_handle: Option<JoinHandle<()>>,
118    /// Cancellation token for stopping the consumer task.
119    /// This allows independent control of the consumer lifecycle (for suspend/resume).
120    pub(super) consumer_cancel_token: CancellationToken,
121    /// Cancellation token for stopping the pipeline task.
122    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
123    pub(super) pipeline_cancel_token: CancellationToken,
124    /// Channel sender for sending exchanges to the pipeline.
125    /// Stored to allow resuming a suspended route without recreating the channel.
126    pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
127    /// In-flight exchange counter. `None` when UoW is not configured for this route.
128    pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
129    pub(super) aggregate_split: Option<AggregateSplitInfo>,
130    pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
131}
132
133pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
134    handle.as_ref().is_some_and(|h| !h.is_finished())
135}
136
137fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
138    match (
139        handle_is_running(&managed.consumer_handle),
140        handle_is_running(&managed.pipeline_handle),
141    ) {
142        (true, true) => "Started",
143        (false, true) => "Suspended",
144        (true, false) => "Stopping",
145        (false, false) => "Stopped",
146    }
147}
148
149fn find_top_level_aggregate_with_timeout(
150    steps: &[BuilderStep],
151) -> Option<(usize, AggregatorConfig)> {
152    for (i, step) in steps.iter().enumerate() {
153        if let BuilderStep::Aggregate { config } = step {
154            if has_timeout_condition(&config.completion) {
155                return Some((i, config.clone()));
156            }
157            break;
158        }
159    }
160    None
161}
162
163pub(crate) struct ControllerComponentContext {
164    registry: Arc<std::sync::Mutex<Registry>>,
165    languages: SharedLanguageRegistry,
166    metrics: Arc<dyn MetricsCollector>,
167    platform_service: Arc<dyn PlatformService>,
168}
169
170impl ControllerComponentContext {
171    pub(crate) fn new(
172        registry: Arc<std::sync::Mutex<Registry>>,
173        languages: SharedLanguageRegistry,
174        metrics: Arc<dyn MetricsCollector>,
175        platform_service: Arc<dyn PlatformService>,
176    ) -> Self {
177        Self {
178            registry,
179            languages,
180            metrics,
181            platform_service,
182        }
183    }
184}
185
186impl ComponentContext for ControllerComponentContext {
187    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
188        self.registry.lock().ok()?.get(scheme)
189    }
190
191    fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
192        self.languages.lock().ok()?.get(name).cloned()
193    }
194
195    fn metrics(&self) -> Arc<dyn MetricsCollector> {
196        Arc::clone(&self.metrics)
197    }
198
199    fn platform_service(&self) -> Arc<dyn PlatformService> {
200        Arc::clone(&self.platform_service)
201    }
202}
203
204fn is_pending(ex: &Exchange) -> bool {
205    ex.property("CamelAggregatorPending")
206        .and_then(|v| v.as_bool())
207        .unwrap_or(false)
208}
209
210/// Wait for a pipeline service to be ready with circuit breaker backoff.
211///
212/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
213/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
214/// cancellation checks. It returns `Ok(())` when the service is ready, or
215/// `Err(e)` if cancellation occurred or a fatal error was encountered.
216async fn ready_with_backoff(
217    pipeline: &mut BoxProcessor,
218    cancel: &CancellationToken,
219) -> Result<(), CamelError> {
220    loop {
221        match pipeline.ready().await {
222            Ok(_) => return Ok(()),
223            Err(CamelError::CircuitOpen(ref msg)) => {
224                warn!("Circuit open, backing off: {msg}");
225                tokio::select! {
226                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
227                        continue;
228                    }
229                    _ = cancel.cancelled() => {
230                        // Shutting down — don't retry.
231                        return Err(CamelError::CircuitOpen(msg.clone()));
232                    }
233                }
234            }
235            Err(e) => {
236                error!("Pipeline not ready: {e}");
237                return Err(e);
238            }
239        }
240    }
241}
242
243fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
244    let stamp = std::time::SystemTime::now()
245        .duration_since(std::time::UNIX_EPOCH)
246        .unwrap_or_default()
247        .as_nanos();
248    RuntimeCommand::FailRoute {
249        route_id: route_id.to_string(),
250        error: error.to_string(),
251        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
252        causation_id: None,
253    }
254}
255
256pub(super) async fn publish_runtime_failure(
257    runtime: Option<Weak<dyn RuntimeHandle>>,
258    route_id: &str,
259    error: &str,
260) {
261    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
262        return;
263    };
264    let command = runtime_failure_command(route_id, error);
265    if let Err(runtime_error) = runtime.execute(command).await {
266        warn!(
267            route_id = %route_id,
268            error = %runtime_error,
269            "failed to synchronize route crash with runtime projection"
270        );
271    }
272}
273
274/// Default implementation of [`RouteController`].
275///
276/// Manages route lifecycle with support for:
277/// - Starting/stopping individual routes
278/// - Suspending and resuming routes
279/// - Auto-startup with startup ordering
280/// - Graceful shutdown
281pub struct DefaultRouteController {
282    /// Routes indexed by route ID.
283    routes: HashMap<String, ManagedRoute>,
284    /// Reference to the component registry for resolving endpoints.
285    registry: Arc<std::sync::Mutex<Registry>>,
286    /// Shared language registry for resolving declarative language expressions.
287    languages: SharedLanguageRegistry,
288    /// Bean registry for bean method invocation.
289    beans: Arc<std::sync::Mutex<BeanRegistry>>,
290    /// Runtime handle injected into ProducerContext for command/query operations.
291    runtime: Option<Weak<dyn RuntimeHandle>>,
292    /// Optional global error handler applied to all routes without a per-route handler.
293    global_error_handler: Option<ErrorHandlerConfig>,
294    /// Optional crash notifier for supervision.
295    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
296    /// Whether tracing is enabled for route pipelines.
297    tracing_enabled: bool,
298    /// Detail level for tracing when enabled.
299    tracer_detail_level: DetailLevel,
300    /// Metrics collector for tracing processor.
301    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
302    platform_service: Arc<dyn PlatformService>,
303}
304
305impl DefaultRouteController {
306    /// Create a new `DefaultRouteController` with the given registry.
307    pub fn new(
308        registry: Arc<std::sync::Mutex<Registry>>,
309        platform_service: Arc<dyn PlatformService>,
310    ) -> Self {
311        Self::with_beans_and_platform_service(
312            registry,
313            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
314            platform_service,
315        )
316    }
317
318    /// Create a new `DefaultRouteController` with shared bean registry.
319    pub fn with_beans(
320        registry: Arc<std::sync::Mutex<Registry>>,
321        beans: Arc<std::sync::Mutex<BeanRegistry>>,
322    ) -> Self {
323        Self::with_beans_and_platform_service(
324            registry,
325            beans,
326            Arc::new(NoopPlatformService::default()),
327        )
328    }
329
330    fn with_beans_and_platform_service(
331        registry: Arc<std::sync::Mutex<Registry>>,
332        beans: Arc<std::sync::Mutex<BeanRegistry>>,
333        platform_service: Arc<dyn PlatformService>,
334    ) -> Self {
335        Self {
336            routes: HashMap::new(),
337            registry,
338            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
339            beans,
340            runtime: None,
341            global_error_handler: None,
342            crash_notifier: None,
343            tracing_enabled: false,
344            tracer_detail_level: DetailLevel::Minimal,
345            tracer_metrics: None,
346            platform_service,
347        }
348    }
349
350    /// Create a new `DefaultRouteController` with shared language registry.
351    pub fn with_languages(
352        registry: Arc<std::sync::Mutex<Registry>>,
353        languages: SharedLanguageRegistry,
354        platform_service: Arc<dyn PlatformService>,
355    ) -> Self {
356        Self {
357            routes: HashMap::new(),
358            registry,
359            languages,
360            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
361            runtime: None,
362            global_error_handler: None,
363            crash_notifier: None,
364            tracing_enabled: false,
365            tracer_detail_level: DetailLevel::Minimal,
366            tracer_metrics: None,
367            platform_service,
368        }
369    }
370
371    /// Set runtime handle for ProducerContext creation.
372    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
373        self.runtime = Some(Arc::downgrade(&runtime));
374    }
375
376    /// Set the crash notifier for supervision.
377    ///
378    /// When set, the controller will send a [`CrashNotification`] whenever
379    /// a consumer crashes.
380    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
381        self.crash_notifier = Some(tx);
382    }
383
384    /// Set a global error handler applied to all routes without a per-route handler.
385    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
386        self.global_error_handler = Some(config);
387    }
388
389    /// Configure tracing for this route controller.
390    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
391        self.tracing_enabled = config.enabled;
392        self.tracer_detail_level = config.detail_level.clone();
393        self.tracer_metrics = config.metrics_collector.clone();
394    }
395
396    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
397        let mut producer_ctx = ProducerContext::new();
398        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
399            producer_ctx = producer_ctx.with_runtime(runtime);
400        }
401        Ok(producer_ctx)
402    }
403
404    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
405    fn resolve_error_handler(
406        &self,
407        config: ErrorHandlerConfig,
408        producer_ctx: &ProducerContext,
409        component_ctx: &dyn ComponentContext,
410    ) -> Result<ErrorHandlerLayer, CamelError> {
411        // Resolve DLC URI → producer.
412        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
413            let parsed = parse_uri(uri)?;
414            let component = component_ctx
415                .resolve_component(&parsed.scheme)
416                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
417            let endpoint = component.create_endpoint(uri, component_ctx)?;
418            Some(endpoint.create_producer(producer_ctx)?)
419        } else {
420            None
421        };
422
423        // Resolve per-policy `handled_by` URIs.
424        let mut resolved_policies = Vec::new();
425        for policy in config.policies {
426            let handler_producer = if let Some(ref uri) = policy.handled_by {
427                let parsed = parse_uri(uri)?;
428                let component = component_ctx
429                    .resolve_component(&parsed.scheme)
430                    .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
431                let endpoint = component.create_endpoint(uri, component_ctx)?;
432                Some(endpoint.create_producer(producer_ctx)?)
433            } else {
434                None
435            };
436            resolved_policies.push((policy, handler_producer));
437        }
438
439        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
440    }
441
442    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
443    /// Returns `Err` if any hook URI cannot be resolved.
444    fn resolve_uow_layer(
445        &self,
446        config: &UnitOfWorkConfig,
447        producer_ctx: &ProducerContext,
448        component_ctx: &dyn ComponentContext,
449        counter: Option<Arc<AtomicU64>>,
450    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
451        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
452            let parsed = parse_uri(uri)?;
453            let component = component_ctx
454                .resolve_component(&parsed.scheme)
455                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
456            let endpoint = component.create_endpoint(uri, component_ctx)?;
457            endpoint.create_producer(producer_ctx).map_err(|e| {
458                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
459            })
460        };
461
462        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
463        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
464
465        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
466        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
467        Ok((layer, counter))
468    }
469
470    /// Resolve BuilderSteps into BoxProcessors.
471    pub(crate) fn resolve_steps(
472        &self,
473        steps: Vec<BuilderStep>,
474        producer_ctx: &ProducerContext,
475        registry: &Arc<std::sync::Mutex<Registry>>,
476    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
477        let component_ctx = Arc::new(ControllerComponentContext::new(
478            Arc::clone(registry),
479            Arc::clone(&self.languages),
480            self.tracer_metrics
481                .clone()
482                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
483            Arc::clone(&self.platform_service),
484        ));
485
486        super::step_resolution::resolve_steps(
487            steps,
488            producer_ctx,
489            registry,
490            &self.languages,
491            &self.beans,
492            component_ctx,
493        )
494    }
495
496    /// Add a route definition to the controller.
497    ///
498    /// Steps are resolved immediately using the registry.
499    ///
500    /// # Errors
501    ///
502    /// Returns an error if:
503    /// - A route with the same ID already exists
504    /// - Step resolution fails
505    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
506        let route_id = definition.route_id().to_string();
507
508        if self.routes.contains_key(&route_id) {
509            return Err(CamelError::RouteError(format!(
510                "Route '{}' already exists",
511                route_id
512            )));
513        }
514
515        info!(route_id = %route_id, "Adding route to controller");
516
517        // Extract definition info for storage before steps are consumed
518        let definition_info = definition.to_info();
519        let RouteDefinition {
520            from_uri,
521            steps,
522            error_handler,
523            circuit_breaker,
524            unit_of_work,
525            concurrency,
526            ..
527        } = definition;
528
529        // Create ProducerContext from self_ref for step resolution
530        let producer_ctx = self.build_producer_context()?;
531
532        // Take ownership of steps before resolve_steps consumes them
533        let mut aggregate_split: Option<AggregateSplitInfo> = None;
534        let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
535            Some((idx, agg_config)) => {
536                let mut pre_steps = steps;
537                let mut rest = pre_steps.split_off(idx);
538                let _agg_step = rest.remove(0);
539                let post_steps = rest;
540
541                let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
542                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
543                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
544                    compose_pipeline(pre_procs),
545                )));
546
547                let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
548                let post_procs: Vec<BoxProcessor> =
549                    post_pairs.into_iter().map(|(p, _)| p).collect();
550                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
551                    compose_pipeline(post_procs),
552                )));
553
554                aggregate_split = Some(AggregateSplitInfo {
555                    pre_pipeline,
556                    agg_config,
557                    post_pipeline,
558                });
559
560                vec![]
561            }
562            None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
563        };
564        let route_id_for_tracing = route_id.clone();
565        let mut pipeline = if processors_with_contracts.is_empty() {
566            BoxProcessor::new(IdentityProcessor)
567        } else {
568            compose_traced_pipeline_with_contracts(
569                processors_with_contracts,
570                &route_id_for_tracing,
571                self.tracing_enabled,
572                self.tracer_detail_level.clone(),
573                self.tracer_metrics.clone(),
574            )
575        };
576
577        // Apply circuit breaker if configured
578        if let Some(cb_config) = circuit_breaker {
579            let cb_layer = CircuitBreakerLayer::new(cb_config);
580            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
581        }
582
583        // Determine which error handler config to use (per-route takes precedence)
584        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
585
586        if let Some(config) = eh_config {
587            let component_ctx = ControllerComponentContext::new(
588                Arc::clone(&self.registry),
589                Arc::clone(&self.languages),
590                self.tracer_metrics
591                    .clone()
592                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
593                Arc::clone(&self.platform_service),
594            );
595            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
596            pipeline = BoxProcessor::new(layer.layer(pipeline));
597        }
598
599        // Apply UoW layer outermost (after error handler)
600        let uow_counter = if let Some(uow_config) = &unit_of_work {
601            let component_ctx = ControllerComponentContext::new(
602                Arc::clone(&self.registry),
603                Arc::clone(&self.languages),
604                self.tracer_metrics
605                    .clone()
606                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
607                Arc::clone(&self.platform_service),
608            );
609            let (uow_layer, counter) =
610                self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
611            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
612            Some(counter)
613        } else {
614            None
615        };
616
617        self.routes.insert(
618            route_id.clone(),
619            ManagedRoute {
620                definition: definition_info,
621                from_uri,
622                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
623                concurrency,
624                consumer_handle: None,
625                pipeline_handle: None,
626                consumer_cancel_token: CancellationToken::new(),
627                pipeline_cancel_token: CancellationToken::new(),
628                channel_sender: None,
629                in_flight: uow_counter,
630                aggregate_split,
631                agg_service: None,
632            },
633        );
634
635        Ok(())
636    }
637
638    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
639    ///
640    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
641    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
642    pub fn compile_route_definition(
643        &self,
644        def: RouteDefinition,
645    ) -> Result<BoxProcessor, CamelError> {
646        let route_id = def.route_id().to_string();
647
648        let producer_ctx = self.build_producer_context()?;
649
650        let processors_with_contracts =
651            self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
652        let mut pipeline = compose_traced_pipeline_with_contracts(
653            processors_with_contracts,
654            &route_id,
655            self.tracing_enabled,
656            self.tracer_detail_level.clone(),
657            self.tracer_metrics.clone(),
658        );
659
660        if let Some(cb_config) = def.circuit_breaker {
661            let cb_layer = CircuitBreakerLayer::new(cb_config);
662            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
663        }
664
665        let eh_config = def
666            .error_handler
667            .clone()
668            .or_else(|| self.global_error_handler.clone());
669        if let Some(config) = eh_config {
670            let component_ctx = ControllerComponentContext::new(
671                Arc::clone(&self.registry),
672                Arc::clone(&self.languages),
673                self.tracer_metrics
674                    .clone()
675                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
676                Arc::clone(&self.platform_service),
677            );
678            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
679            pipeline = BoxProcessor::new(layer.layer(pipeline));
680        }
681
682        // Apply UoW layer outermost
683        if let Some(uow_config) = &def.unit_of_work {
684            let existing_counter = self
685                .routes
686                .get(&route_id)
687                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
688
689            let component_ctx = ControllerComponentContext::new(
690                Arc::clone(&self.registry),
691                Arc::clone(&self.languages),
692                self.tracer_metrics
693                    .clone()
694                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
695                Arc::clone(&self.platform_service),
696            );
697
698            let (uow_layer, _counter) = self.resolve_uow_layer(
699                uow_config,
700                &producer_ctx,
701                &component_ctx,
702                existing_counter,
703            )?;
704
705            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
706        }
707
708        Ok(pipeline)
709    }
710
711    /// Remove a route from the controller map.
712    ///
713    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
714    /// Returns an error if the route is still running or does not exist.
715    /// Does not cancel any running tasks — call `stop_route` first.
716    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
717        let managed = self.routes.get(route_id).ok_or_else(|| {
718            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
719        })?;
720        if handle_is_running(&managed.consumer_handle)
721            || handle_is_running(&managed.pipeline_handle)
722        {
723            return Err(CamelError::RouteError(format!(
724                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
725                route_id,
726                inferred_lifecycle_label(managed)
727            )));
728        }
729        self.routes.remove(route_id);
730        info!(route_id = %route_id, "Route removed from controller");
731        Ok(())
732    }
733
734    /// Returns the number of routes in the controller.
735    pub fn route_count(&self) -> usize {
736        self.routes.len()
737    }
738
739    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
740        self.routes.get(route_id).map(|r| {
741            r.in_flight
742                .as_ref()
743                .map_or(0, |c| c.load(Ordering::Relaxed))
744        })
745    }
746
747    /// Returns `true` if a route with the given ID exists.
748    pub fn route_exists(&self, route_id: &str) -> bool {
749        self.routes.contains_key(route_id)
750    }
751
752    /// Returns all route IDs.
753    pub fn route_ids(&self) -> Vec<String> {
754        self.routes.keys().cloned().collect()
755    }
756
757    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
758        self.routes
759            .get(route_id)
760            .and_then(|m| m.definition.source_hash())
761    }
762
763    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
764    pub fn auto_startup_route_ids(&self) -> Vec<String> {
765        let mut pairs: Vec<(String, i32)> = self
766            .routes
767            .iter()
768            .filter(|(_, managed)| managed.definition.auto_startup())
769            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
770            .collect();
771        pairs.sort_by_key(|(_, order)| *order);
772        pairs.into_iter().map(|(id, _)| id).collect()
773    }
774
775    /// Returns route IDs sorted by shutdown order (startup order descending).
776    pub fn shutdown_route_ids(&self) -> Vec<String> {
777        let mut pairs: Vec<(String, i32)> = self
778            .routes
779            .iter()
780            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
781            .collect();
782        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
783        pairs.into_iter().map(|(id, _)| id).collect()
784    }
785
786    /// Atomically swap the pipeline of a route.
787    ///
788    /// In-flight requests finish with the old pipeline (kept alive by Arc).
789    /// New requests immediately use the new pipeline.
790    pub fn swap_pipeline(
791        &self,
792        route_id: &str,
793        new_pipeline: BoxProcessor,
794    ) -> Result<(), CamelError> {
795        let managed = self
796            .routes
797            .get(route_id)
798            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
799
800        if managed.aggregate_split.is_some() {
801            tracing::warn!(
802                route_id = %route_id,
803                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
804            );
805        }
806
807        managed
808            .pipeline
809            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
810        info!(route_id = %route_id, "Pipeline swapped atomically");
811        Ok(())
812    }
813
814    /// Returns the from_uri of a route, if it exists.
815    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
816        self.routes.get(route_id).map(|r| r.from_uri.clone())
817    }
818
819    /// Get a clone of the current pipeline for a route.
820    ///
821    /// This is useful for testing and introspection.
822    /// Returns `None` if the route doesn't exist.
823    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
824        self.routes
825            .get(route_id)
826            .map(|r| r.pipeline.load().0.clone())
827    }
828
829    /// Internal stop implementation that can set custom status.
830    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
831        super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
832    }
833
834    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
835        self.start_route(route_id).await
836    }
837
838    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
839        self.stop_route(route_id).await
840    }
841}
842
843#[async_trait::async_trait]
844impl RouteController for DefaultRouteController {
845    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
846        // Check if route exists and can be started.
847        {
848            let managed = self
849                .routes
850                .get_mut(route_id)
851                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
852
853            let consumer_running = handle_is_running(&managed.consumer_handle);
854            let pipeline_running = handle_is_running(&managed.pipeline_handle);
855            if consumer_running && pipeline_running {
856                return Ok(());
857            }
858            if !consumer_running && pipeline_running {
859                return Err(CamelError::RouteError(format!(
860                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
861                    route_id
862                )));
863            }
864            if consumer_running && !pipeline_running {
865                return Err(CamelError::RouteError(format!(
866                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
867                    route_id
868                )));
869            }
870        }
871
872        info!(route_id = %route_id, "Starting route");
873
874        // Get the resolved route info
875        let (from_uri, pipeline, concurrency) = {
876            let managed = self
877                .routes
878                .get(route_id)
879                .expect("invariant: route must exist after prior existence check");
880            (
881                managed.from_uri.clone(),
882                Arc::clone(&managed.pipeline),
883                managed.concurrency.clone(),
884            )
885        };
886
887        // Clone crash notifier for consumer task
888        let crash_notifier = self.crash_notifier.clone();
889        let runtime_for_consumer = self.runtime.clone();
890
891        let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
892            &self.registry,
893            &from_uri,
894            &ControllerComponentContext::new(
895                Arc::clone(&self.registry),
896                Arc::clone(&self.languages),
897                self.tracer_metrics
898                    .clone()
899                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
900                Arc::clone(&self.platform_service),
901            ),
902        )?;
903
904        // Resolve effective concurrency: route override > consumer default
905        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
906
907        // Get the managed route for mutation
908        let managed = self
909            .routes
910            .get_mut(route_id)
911            .expect("invariant: route must exist after prior existence check");
912
913        // Create channel for consumer to send exchanges
914        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
915        // Create child tokens for independent lifecycle control
916        let consumer_cancel = managed.consumer_cancel_token.child_token();
917        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
918        // Clone sender for storage (to reuse on resume)
919        let tx_for_storage = tx.clone();
920        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
921
922        // --- Aggregator v2: check for aggregate route with timeout ---
923        let managed = self
924            .routes
925            .get_mut(route_id)
926            .expect("invariant: route must exist after prior existence check");
927
928        if let Some(split) = managed.aggregate_split.as_ref() {
929            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
930
931            let route_cancel_clone = pipeline_cancel.clone();
932            let svc = AggregatorService::new(
933                split.agg_config.clone(),
934                late_tx,
935                Arc::clone(&self.languages),
936                route_cancel_clone,
937            );
938            let agg = Arc::new(std::sync::Mutex::new(svc));
939
940            managed.agg_service = Some(Arc::clone(&agg));
941
942            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
943            let pre_pipeline = Arc::clone(&split.pre_pipeline);
944            let post_pipeline = Arc::clone(&split.post_pipeline);
945
946            // Spawn biased select forward loop
947            let pipeline_handle = tokio::spawn(async move {
948                loop {
949                    tokio::select! {
950                        biased;
951
952                        late_ex = async {
953                            let mut rx = late_rx.lock().await;
954                            rx.recv().await
955                        } => {
956                            match late_ex {
957                                Some(ex) => {
958                                    let pipe = post_pipeline.load();
959                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
960                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
961                                    }
962                                }
963                                None => return,
964                            }
965                        }
966
967                        envelope_opt = rx.recv() => {
968                            match envelope_opt {
969                                Some(envelope) => {
970                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
971                                    let pre_pipe = pre_pipeline.load();
972                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
973                                        Ok(ex) => ex,
974                                        Err(e) => {
975                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
976                                            continue;
977                                        }
978                                    };
979
980                                    let ex = {
981                                        let cloned_svc = agg
982                                            .lock()
983                                            .expect("mutex poisoned: another thread panicked while holding this lock")
984                                            .clone();
985                                        cloned_svc.oneshot(ex).await
986                                    };
987
988                                    match ex {
989                                        Ok(ex) => {
990                                            if !is_pending(&ex) {
991                                                let post_pipe = post_pipeline.load();
992                                                let out = post_pipe.0.clone().oneshot(ex).await;
993                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
994                                            } else if let Some(tx) = reply_tx {
995                                                let _ = tx.send(Ok(ex));
996                                            }
997                                        }
998                                        Err(e) => {
999                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1000                                        }
1001                                    }
1002                                }
1003                                None => return,
1004                            }
1005                        }
1006
1007                        _ = pipeline_cancel.cancelled() => {
1008                            {
1009                                let guard = agg
1010                                    .lock()
1011                                    .expect("mutex poisoned: another thread panicked while holding this lock");
1012                                guard.force_complete_all();
1013                            }
1014                            let mut rx_guard = late_rx.lock().await;
1015                            while let Ok(late_ex) = rx_guard.try_recv() {
1016                                let pipe = post_pipeline.load();
1017                                let _ = pipe.0.clone().oneshot(late_ex).await;
1018                            }
1019                            break;
1020                        }
1021                    }
1022                }
1023            });
1024            #[cfg(test)]
1025            emit_start_route_event("pipeline_spawned");
1026
1027            // Start consumer after pipeline loop is spawned to avoid startup races
1028            // where consumers emit exchanges before the route pipeline begins polling.
1029            let consumer_handle = super::consumer_management::spawn_consumer_task(
1030                route_id.to_string(),
1031                consumer,
1032                consumer_ctx,
1033                crash_notifier,
1034                runtime_for_consumer,
1035                false,
1036            );
1037            #[cfg(test)]
1038            emit_start_route_event("consumer_spawned");
1039
1040            let managed = self
1041                .routes
1042                .get_mut(route_id)
1043                .expect("invariant: route must exist");
1044            managed.consumer_handle = Some(consumer_handle);
1045            managed.pipeline_handle = Some(pipeline_handle);
1046            managed.channel_sender = Some(tx_for_storage);
1047
1048            info!(route_id = %route_id, "Route started (aggregate with timeout)");
1049            return Ok(());
1050        }
1051        // --- End aggregator v2 branch ---
1052
1053        // Spawn pipeline task with its own cancellation token
1054        let pipeline_handle = match effective_concurrency {
1055            ConcurrencyModel::Sequential => {
1056                tokio::spawn(async move {
1057                    loop {
1058                        // Use select! to exit promptly on cancellation even when idle
1059                        let envelope = tokio::select! {
1060                            envelope = rx.recv() => match envelope {
1061                                Some(e) => e,
1062                                None => return, // Channel closed
1063                            },
1064                            _ = pipeline_cancel.cancelled() => {
1065                                // Cancellation requested - exit gracefully
1066                                return;
1067                            }
1068                        };
1069                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1070
1071                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1072                        let mut pipeline = pipeline.load().0.clone();
1073
1074                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1075                            if let Some(tx) = reply_tx {
1076                                let _ = tx.send(Err(e));
1077                            }
1078                            return;
1079                        }
1080
1081                        let result = pipeline.call(exchange).await;
1082                        if let Some(tx) = reply_tx {
1083                            let _ = tx.send(result);
1084                        } else if let Err(ref e) = result
1085                            && !matches!(e, CamelError::Stopped)
1086                        {
1087                            error!("Pipeline error: {e}");
1088                        }
1089                    }
1090                })
1091            }
1092            ConcurrencyModel::Concurrent { max } => {
1093                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1094                tokio::spawn(async move {
1095                    loop {
1096                        // Use select! to exit promptly on cancellation even when idle
1097                        let envelope = tokio::select! {
1098                            envelope = rx.recv() => match envelope {
1099                                Some(e) => e,
1100                                None => return, // Channel closed
1101                            },
1102                            _ = pipeline_cancel.cancelled() => {
1103                                // Cancellation requested - exit gracefully
1104                                return;
1105                            }
1106                        };
1107                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1108                        let pipe_ref = Arc::clone(&pipeline);
1109                        let sem = sem.clone();
1110                        let cancel = pipeline_cancel.clone();
1111                        tokio::spawn(async move {
1112                            // Acquire semaphore permit if bounded
1113                            let _permit = match &sem {
1114                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1115                                None => None,
1116                            };
1117
1118                            // Load current pipeline from ArcSwap
1119                            let mut pipe = pipe_ref.load().0.clone();
1120
1121                            // Wait for service ready with circuit breaker backoff
1122                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1123                                if let Some(tx) = reply_tx {
1124                                    let _ = tx.send(Err(e));
1125                                }
1126                                return;
1127                            }
1128
1129                            let result = pipe.call(exchange).await;
1130                            if let Some(tx) = reply_tx {
1131                                let _ = tx.send(result);
1132                            } else if let Err(ref e) = result
1133                                && !matches!(e, CamelError::Stopped)
1134                            {
1135                                error!("Pipeline error: {e}");
1136                            }
1137                        });
1138                    }
1139                })
1140            }
1141        };
1142        #[cfg(test)]
1143        emit_start_route_event("pipeline_spawned");
1144
1145        // Start consumer after pipeline task is spawned to minimize the chance of
1146        // fire-and-forget events being produced before the pipeline loop is active.
1147        let consumer_handle = super::consumer_management::spawn_consumer_task(
1148            route_id.to_string(),
1149            consumer,
1150            consumer_ctx,
1151            crash_notifier,
1152            runtime_for_consumer,
1153            false,
1154        );
1155        #[cfg(test)]
1156        emit_start_route_event("consumer_spawned");
1157
1158        // Store handles and update status
1159        let managed = self
1160            .routes
1161            .get_mut(route_id)
1162            .expect("invariant: route must exist after prior existence check");
1163        managed.consumer_handle = Some(consumer_handle);
1164        managed.pipeline_handle = Some(pipeline_handle);
1165        managed.channel_sender = Some(tx_for_storage);
1166
1167        info!(route_id = %route_id, "Route started");
1168        Ok(())
1169    }
1170
1171    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1172        self.stop_route_internal(route_id).await
1173    }
1174
1175    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1176        self.stop_route(route_id).await?;
1177        tokio::time::sleep(Duration::from_millis(100)).await;
1178        self.start_route(route_id).await
1179    }
1180
1181    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1182        // Check route exists and state.
1183        let managed = self
1184            .routes
1185            .get_mut(route_id)
1186            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1187
1188        let consumer_running = handle_is_running(&managed.consumer_handle);
1189        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1190
1191        // Can only suspend from active started state.
1192        if !consumer_running || !pipeline_running {
1193            return Err(CamelError::RouteError(format!(
1194                "Cannot suspend route '{}' with execution lifecycle {}",
1195                route_id,
1196                inferred_lifecycle_label(managed)
1197            )));
1198        }
1199
1200        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1201
1202        // Cancel consumer token only (keep pipeline running)
1203        let managed = self
1204            .routes
1205            .get_mut(route_id)
1206            .expect("invariant: route must exist after prior existence check");
1207        managed.consumer_cancel_token.cancel();
1208
1209        // Take and join consumer handle
1210        let managed = self
1211            .routes
1212            .get_mut(route_id)
1213            .expect("invariant: route must exist after prior existence check");
1214        let consumer_handle = managed.consumer_handle.take();
1215
1216        // Wait for consumer task to complete with timeout
1217        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1218            if let Some(handle) = consumer_handle {
1219                let _ = handle.await;
1220            }
1221        })
1222        .await;
1223
1224        if timeout_result.is_err() {
1225            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1226        }
1227
1228        // Get the managed route again (can't hold across await)
1229        let managed = self
1230            .routes
1231            .get_mut(route_id)
1232            .expect("invariant: route must exist after prior existence check");
1233
1234        // Create fresh cancellation token for consumer (for resume)
1235        managed.consumer_cancel_token = CancellationToken::new();
1236
1237        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1238        Ok(())
1239    }
1240
1241    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1242        // Check route exists and is Suspended-equivalent execution state.
1243        let managed = self
1244            .routes
1245            .get(route_id)
1246            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1247
1248        let consumer_running = handle_is_running(&managed.consumer_handle);
1249        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1250        if consumer_running || !pipeline_running {
1251            return Err(CamelError::RouteError(format!(
1252                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1253                route_id,
1254                inferred_lifecycle_label(managed)
1255            )));
1256        }
1257
1258        // Get the stored channel sender (must exist for a suspended route)
1259        let sender = managed.channel_sender.clone().ok_or_else(|| {
1260            CamelError::RouteError("Suspended route has no channel sender".into())
1261        })?;
1262
1263        // Get from_uri and concurrency for creating new consumer
1264        let from_uri = managed.from_uri.clone();
1265
1266        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1267
1268        let (consumer, _) = super::consumer_management::create_route_consumer(
1269            &self.registry,
1270            &from_uri,
1271            &ControllerComponentContext::new(
1272                Arc::clone(&self.registry),
1273                Arc::clone(&self.languages),
1274                self.tracer_metrics
1275                    .clone()
1276                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1277                Arc::clone(&self.platform_service),
1278            ),
1279        )?;
1280
1281        // Get the managed route for mutation
1282        let managed = self
1283            .routes
1284            .get_mut(route_id)
1285            .expect("invariant: route must exist after prior existence check");
1286
1287        // Create child token for consumer lifecycle
1288        let consumer_cancel = managed.consumer_cancel_token.child_token();
1289
1290        let crash_notifier = self.crash_notifier.clone();
1291        let runtime_for_consumer = self.runtime.clone();
1292
1293        // Create ConsumerContext with the stored sender
1294        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1295
1296        // Spawn consumer task
1297        let consumer_handle = super::consumer_management::spawn_consumer_task(
1298            route_id.to_string(),
1299            consumer,
1300            consumer_ctx,
1301            crash_notifier,
1302            runtime_for_consumer,
1303            true,
1304        );
1305
1306        // Store consumer handle and update status
1307        let managed = self
1308            .routes
1309            .get_mut(route_id)
1310            .expect("invariant: route must exist after prior existence check");
1311        managed.consumer_handle = Some(consumer_handle);
1312
1313        info!(route_id = %route_id, "Route resumed");
1314        Ok(())
1315    }
1316
1317    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1318        // Only start routes where auto_startup() == true
1319        // Sort by startup_order() ascending before starting
1320        let route_ids: Vec<String> = {
1321            let mut pairs: Vec<_> = self
1322                .routes
1323                .iter()
1324                .filter(|(_, r)| r.definition.auto_startup())
1325                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1326                .collect();
1327            pairs.sort_by_key(|(_, order)| *order);
1328            pairs.into_iter().map(|(id, _)| id).collect()
1329        };
1330
1331        info!("Starting {} auto-startup routes", route_ids.len());
1332
1333        // Collect errors but continue starting remaining routes
1334        let mut errors: Vec<String> = Vec::new();
1335        for route_id in route_ids {
1336            if let Err(e) = self.start_route(&route_id).await {
1337                errors.push(format!("Route '{}': {}", route_id, e));
1338            }
1339        }
1340
1341        if !errors.is_empty() {
1342            return Err(CamelError::RouteError(format!(
1343                "Failed to start routes: {}",
1344                errors.join(", ")
1345            )));
1346        }
1347
1348        info!("All auto-startup routes started");
1349        Ok(())
1350    }
1351
1352    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1353        // Sort by startup_order descending (reverse order)
1354        let route_ids: Vec<String> = {
1355            let mut pairs: Vec<_> = self
1356                .routes
1357                .iter()
1358                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1359                .collect();
1360            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1361            pairs.into_iter().map(|(id, _)| id).collect()
1362        };
1363
1364        info!("Stopping {} routes", route_ids.len());
1365
1366        for route_id in route_ids {
1367            let _ = self.stop_route(&route_id).await;
1368        }
1369
1370        info!("All routes stopped");
1371        Ok(())
1372    }
1373}
1374
1375#[cfg(test)]
1376#[path = "route_controller_tests.rs"]
1377mod tests;