Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21    BoxProcessor, CamelError, Exchange, IdentityProcessor, NoOpMetrics, NoopPlatformService,
22    PlatformService, ProducerContext, RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25    ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35    compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38    BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45/// Notification sent when a route crashes.
46#[derive(Debug, Clone)]
47pub struct CrashNotification {
48    /// The ID of the crashed route.
49    pub route_id: String,
50    /// The error that caused the crash.
51    pub error: String,
52}
53
54/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
55///
56/// # Safety
57///
58/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
59/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
60///
61/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
62///    (creates a new boxed service from the inner clone).
63/// 2. The clone is owned by the calling thread and never shared.
64/// 3. ArcSwap guarantees we only get & references (no &mut).
65///
66/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
67/// clone() does not mutate shared state and each thread gets an independent copy.
68pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73/// Internal state for a managed route.
74pub(super) struct AggregateSplitInfo {
75    pub(super) pre_pipeline: SharedPipeline,
76    pub(super) agg_config: AggregatorConfig,
77    pub(super) post_pipeline: SharedPipeline,
78}
79
80pub(super) struct ManagedRoute {
81    /// The route definition metadata (for introspection).
82    pub(super) definition: RouteDefinitionInfo,
83    /// Source endpoint URI.
84    pub(super) from_uri: String,
85    /// Resolved processor pipeline (wrapped for atomic swap).
86    pub(super) pipeline: SharedPipeline,
87    /// Concurrency model override (if any).
88    pub(super) concurrency: Option<ConcurrencyModel>,
89    /// Handle for the consumer task (if running).
90    pub(super) consumer_handle: Option<JoinHandle<()>>,
91    /// Handle for the pipeline task (if running).
92    pub(super) pipeline_handle: Option<JoinHandle<()>>,
93    /// Cancellation token for stopping the consumer task.
94    /// This allows independent control of the consumer lifecycle (for suspend/resume).
95    pub(super) consumer_cancel_token: CancellationToken,
96    /// Cancellation token for stopping the pipeline task.
97    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
98    pub(super) pipeline_cancel_token: CancellationToken,
99    /// Channel sender for sending exchanges to the pipeline.
100    /// Stored to allow resuming a suspended route without recreating the channel.
101    pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
102    /// In-flight exchange counter. `None` when UoW is not configured for this route.
103    pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
104    pub(super) aggregate_split: Option<AggregateSplitInfo>,
105    pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
106}
107
108pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
109    handle.as_ref().is_some_and(|h| !h.is_finished())
110}
111
112fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
113    match (
114        handle_is_running(&managed.consumer_handle),
115        handle_is_running(&managed.pipeline_handle),
116    ) {
117        (true, true) => "Started",
118        (false, true) => "Suspended",
119        (true, false) => "Stopping",
120        (false, false) => "Stopped",
121    }
122}
123
124fn find_top_level_aggregate_with_timeout(
125    steps: &[BuilderStep],
126) -> Option<(usize, AggregatorConfig)> {
127    for (i, step) in steps.iter().enumerate() {
128        if let BuilderStep::Aggregate { config } = step {
129            if has_timeout_condition(&config.completion) {
130                return Some((i, config.clone()));
131            }
132            break;
133        }
134    }
135    None
136}
137
138pub(crate) struct ControllerComponentContext {
139    registry: Arc<std::sync::Mutex<Registry>>,
140    languages: SharedLanguageRegistry,
141    metrics: Arc<dyn MetricsCollector>,
142    platform_service: Arc<dyn PlatformService>,
143}
144
145impl ControllerComponentContext {
146    pub(crate) fn new(
147        registry: Arc<std::sync::Mutex<Registry>>,
148        languages: SharedLanguageRegistry,
149        metrics: Arc<dyn MetricsCollector>,
150        platform_service: Arc<dyn PlatformService>,
151    ) -> Self {
152        Self {
153            registry,
154            languages,
155            metrics,
156            platform_service,
157        }
158    }
159}
160
161impl ComponentContext for ControllerComponentContext {
162    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
163        self.registry.lock().ok()?.get(scheme)
164    }
165
166    fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
167        self.languages.lock().ok()?.get(name).cloned()
168    }
169
170    fn metrics(&self) -> Arc<dyn MetricsCollector> {
171        Arc::clone(&self.metrics)
172    }
173
174    fn platform_service(&self) -> Arc<dyn PlatformService> {
175        Arc::clone(&self.platform_service)
176    }
177}
178
179fn is_pending(ex: &Exchange) -> bool {
180    ex.property("CamelAggregatorPending")
181        .and_then(|v| v.as_bool())
182        .unwrap_or(false)
183}
184
185/// Wait for a pipeline service to be ready with circuit breaker backoff.
186///
187/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
188/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
189/// cancellation checks. It returns `Ok(())` when the service is ready, or
190/// `Err(e)` if cancellation occurred or a fatal error was encountered.
191async fn ready_with_backoff(
192    pipeline: &mut BoxProcessor,
193    cancel: &CancellationToken,
194) -> Result<(), CamelError> {
195    loop {
196        match pipeline.ready().await {
197            Ok(_) => return Ok(()),
198            Err(CamelError::CircuitOpen(ref msg)) => {
199                warn!("Circuit open, backing off: {msg}");
200                tokio::select! {
201                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
202                        continue;
203                    }
204                    _ = cancel.cancelled() => {
205                        // Shutting down — don't retry.
206                        return Err(CamelError::CircuitOpen(msg.clone()));
207                    }
208                }
209            }
210            Err(e) => {
211                error!("Pipeline not ready: {e}");
212                return Err(e);
213            }
214        }
215    }
216}
217
218fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
219    let stamp = std::time::SystemTime::now()
220        .duration_since(std::time::UNIX_EPOCH)
221        .unwrap_or_default()
222        .as_nanos();
223    RuntimeCommand::FailRoute {
224        route_id: route_id.to_string(),
225        error: error.to_string(),
226        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
227        causation_id: None,
228    }
229}
230
231pub(super) async fn publish_runtime_failure(
232    runtime: Option<Weak<dyn RuntimeHandle>>,
233    route_id: &str,
234    error: &str,
235) {
236    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
237        return;
238    };
239    let command = runtime_failure_command(route_id, error);
240    if let Err(runtime_error) = runtime.execute(command).await {
241        warn!(
242            route_id = %route_id,
243            error = %runtime_error,
244            "failed to synchronize route crash with runtime projection"
245        );
246    }
247}
248
249/// Default implementation of [`RouteController`].
250///
251/// Manages route lifecycle with support for:
252/// - Starting/stopping individual routes
253/// - Suspending and resuming routes
254/// - Auto-startup with startup ordering
255/// - Graceful shutdown
256pub struct DefaultRouteController {
257    /// Routes indexed by route ID.
258    routes: HashMap<String, ManagedRoute>,
259    /// Reference to the component registry for resolving endpoints.
260    registry: Arc<std::sync::Mutex<Registry>>,
261    /// Shared language registry for resolving declarative language expressions.
262    languages: SharedLanguageRegistry,
263    /// Bean registry for bean method invocation.
264    beans: Arc<std::sync::Mutex<BeanRegistry>>,
265    /// Runtime handle injected into ProducerContext for command/query operations.
266    runtime: Option<Weak<dyn RuntimeHandle>>,
267    /// Optional global error handler applied to all routes without a per-route handler.
268    global_error_handler: Option<ErrorHandlerConfig>,
269    /// Optional crash notifier for supervision.
270    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
271    /// Whether tracing is enabled for route pipelines.
272    tracing_enabled: bool,
273    /// Detail level for tracing when enabled.
274    tracer_detail_level: DetailLevel,
275    /// Metrics collector for tracing processor.
276    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
277    platform_service: Arc<dyn PlatformService>,
278}
279
280impl DefaultRouteController {
281    /// Create a new `DefaultRouteController` with the given registry.
282    pub fn new(
283        registry: Arc<std::sync::Mutex<Registry>>,
284        platform_service: Arc<dyn PlatformService>,
285    ) -> Self {
286        Self::with_beans_and_platform_service(
287            registry,
288            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
289            platform_service,
290        )
291    }
292
293    /// Create a new `DefaultRouteController` with shared bean registry.
294    pub fn with_beans(
295        registry: Arc<std::sync::Mutex<Registry>>,
296        beans: Arc<std::sync::Mutex<BeanRegistry>>,
297    ) -> Self {
298        Self::with_beans_and_platform_service(
299            registry,
300            beans,
301            Arc::new(NoopPlatformService::default()),
302        )
303    }
304
305    fn with_beans_and_platform_service(
306        registry: Arc<std::sync::Mutex<Registry>>,
307        beans: Arc<std::sync::Mutex<BeanRegistry>>,
308        platform_service: Arc<dyn PlatformService>,
309    ) -> Self {
310        Self {
311            routes: HashMap::new(),
312            registry,
313            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
314            beans,
315            runtime: None,
316            global_error_handler: None,
317            crash_notifier: None,
318            tracing_enabled: false,
319            tracer_detail_level: DetailLevel::Minimal,
320            tracer_metrics: None,
321            platform_service,
322        }
323    }
324
325    /// Create a new `DefaultRouteController` with shared language registry.
326    pub fn with_languages(
327        registry: Arc<std::sync::Mutex<Registry>>,
328        languages: SharedLanguageRegistry,
329        platform_service: Arc<dyn PlatformService>,
330    ) -> Self {
331        Self {
332            routes: HashMap::new(),
333            registry,
334            languages,
335            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
336            runtime: None,
337            global_error_handler: None,
338            crash_notifier: None,
339            tracing_enabled: false,
340            tracer_detail_level: DetailLevel::Minimal,
341            tracer_metrics: None,
342            platform_service,
343        }
344    }
345
346    /// Set runtime handle for ProducerContext creation.
347    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
348        self.runtime = Some(Arc::downgrade(&runtime));
349    }
350
351    /// Set the crash notifier for supervision.
352    ///
353    /// When set, the controller will send a [`CrashNotification`] whenever
354    /// a consumer crashes.
355    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
356        self.crash_notifier = Some(tx);
357    }
358
359    /// Set a global error handler applied to all routes without a per-route handler.
360    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
361        self.global_error_handler = Some(config);
362    }
363
364    /// Configure tracing for this route controller.
365    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
366        self.tracing_enabled = config.enabled;
367        self.tracer_detail_level = config.detail_level.clone();
368        self.tracer_metrics = config.metrics_collector.clone();
369    }
370
371    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
372        let mut producer_ctx = ProducerContext::new();
373        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
374            producer_ctx = producer_ctx.with_runtime(runtime);
375        }
376        Ok(producer_ctx)
377    }
378
379    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
380    fn resolve_error_handler(
381        &self,
382        config: ErrorHandlerConfig,
383        producer_ctx: &ProducerContext,
384        component_ctx: &dyn ComponentContext,
385    ) -> Result<ErrorHandlerLayer, CamelError> {
386        // Resolve DLC URI → producer.
387        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
388            let parsed = parse_uri(uri)?;
389            let component = component_ctx
390                .resolve_component(&parsed.scheme)
391                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
392            let endpoint = component.create_endpoint(uri, component_ctx)?;
393            Some(endpoint.create_producer(producer_ctx)?)
394        } else {
395            None
396        };
397
398        // Resolve per-policy `handled_by` URIs.
399        let mut resolved_policies = Vec::new();
400        for policy in config.policies {
401            let handler_producer = if let Some(ref uri) = policy.handled_by {
402                let parsed = parse_uri(uri)?;
403                let component = component_ctx
404                    .resolve_component(&parsed.scheme)
405                    .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
406                let endpoint = component.create_endpoint(uri, component_ctx)?;
407                Some(endpoint.create_producer(producer_ctx)?)
408            } else {
409                None
410            };
411            resolved_policies.push((policy, handler_producer));
412        }
413
414        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
415    }
416
417    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
418    /// Returns `Err` if any hook URI cannot be resolved.
419    fn resolve_uow_layer(
420        &self,
421        config: &UnitOfWorkConfig,
422        producer_ctx: &ProducerContext,
423        component_ctx: &dyn ComponentContext,
424        counter: Option<Arc<AtomicU64>>,
425    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
426        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
427            let parsed = parse_uri(uri)?;
428            let component = component_ctx
429                .resolve_component(&parsed.scheme)
430                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
431            let endpoint = component.create_endpoint(uri, component_ctx)?;
432            endpoint.create_producer(producer_ctx).map_err(|e| {
433                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
434            })
435        };
436
437        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
438        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
439
440        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
441        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
442        Ok((layer, counter))
443    }
444
445    /// Resolve BuilderSteps into BoxProcessors.
446    pub(crate) fn resolve_steps(
447        &self,
448        steps: Vec<BuilderStep>,
449        producer_ctx: &ProducerContext,
450        registry: &Arc<std::sync::Mutex<Registry>>,
451    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
452        let component_ctx = Arc::new(ControllerComponentContext::new(
453            Arc::clone(registry),
454            Arc::clone(&self.languages),
455            self.tracer_metrics
456                .clone()
457                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
458            Arc::clone(&self.platform_service),
459        ));
460
461        super::step_resolution::resolve_steps(
462            steps,
463            producer_ctx,
464            registry,
465            &self.languages,
466            &self.beans,
467            component_ctx,
468        )
469    }
470
471    /// Add a route definition to the controller.
472    ///
473    /// Steps are resolved immediately using the registry.
474    ///
475    /// # Errors
476    ///
477    /// Returns an error if:
478    /// - A route with the same ID already exists
479    /// - Step resolution fails
480    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
481        let route_id = definition.route_id().to_string();
482
483        if self.routes.contains_key(&route_id) {
484            return Err(CamelError::RouteError(format!(
485                "Route '{}' already exists",
486                route_id
487            )));
488        }
489
490        info!(route_id = %route_id, "Adding route to controller");
491
492        // Extract definition info for storage before steps are consumed
493        let definition_info = definition.to_info();
494        let RouteDefinition {
495            from_uri,
496            steps,
497            error_handler,
498            circuit_breaker,
499            unit_of_work,
500            concurrency,
501            ..
502        } = definition;
503
504        // Create ProducerContext from self_ref for step resolution
505        let producer_ctx = self.build_producer_context()?;
506
507        // Take ownership of steps before resolve_steps consumes them
508        let mut aggregate_split: Option<AggregateSplitInfo> = None;
509        let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
510            Some((idx, agg_config)) => {
511                let mut pre_steps = steps;
512                let mut rest = pre_steps.split_off(idx);
513                let _agg_step = rest.remove(0);
514                let post_steps = rest;
515
516                let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
517                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
518                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
519                    compose_pipeline(pre_procs),
520                )));
521
522                let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
523                let post_procs: Vec<BoxProcessor> =
524                    post_pairs.into_iter().map(|(p, _)| p).collect();
525                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
526                    compose_pipeline(post_procs),
527                )));
528
529                aggregate_split = Some(AggregateSplitInfo {
530                    pre_pipeline,
531                    agg_config,
532                    post_pipeline,
533                });
534
535                vec![]
536            }
537            None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
538        };
539        let route_id_for_tracing = route_id.clone();
540        let mut pipeline = if processors_with_contracts.is_empty() {
541            BoxProcessor::new(IdentityProcessor)
542        } else {
543            compose_traced_pipeline_with_contracts(
544                processors_with_contracts,
545                &route_id_for_tracing,
546                self.tracing_enabled,
547                self.tracer_detail_level.clone(),
548                self.tracer_metrics.clone(),
549            )
550        };
551
552        // Apply circuit breaker if configured
553        if let Some(cb_config) = circuit_breaker {
554            let cb_layer = CircuitBreakerLayer::new(cb_config);
555            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
556        }
557
558        // Determine which error handler config to use (per-route takes precedence)
559        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
560
561        if let Some(config) = eh_config {
562            let component_ctx = ControllerComponentContext::new(
563                Arc::clone(&self.registry),
564                Arc::clone(&self.languages),
565                self.tracer_metrics
566                    .clone()
567                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
568                Arc::clone(&self.platform_service),
569            );
570            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
571            pipeline = BoxProcessor::new(layer.layer(pipeline));
572        }
573
574        // Apply UoW layer outermost (after error handler)
575        let uow_counter = if let Some(uow_config) = &unit_of_work {
576            let component_ctx = ControllerComponentContext::new(
577                Arc::clone(&self.registry),
578                Arc::clone(&self.languages),
579                self.tracer_metrics
580                    .clone()
581                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
582                Arc::clone(&self.platform_service),
583            );
584            let (uow_layer, counter) =
585                self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
586            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
587            Some(counter)
588        } else {
589            None
590        };
591
592        self.routes.insert(
593            route_id.clone(),
594            ManagedRoute {
595                definition: definition_info,
596                from_uri,
597                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
598                concurrency,
599                consumer_handle: None,
600                pipeline_handle: None,
601                consumer_cancel_token: CancellationToken::new(),
602                pipeline_cancel_token: CancellationToken::new(),
603                channel_sender: None,
604                in_flight: uow_counter,
605                aggregate_split,
606                agg_service: None,
607            },
608        );
609
610        Ok(())
611    }
612
613    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
614    ///
615    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
616    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
617    pub fn compile_route_definition(
618        &self,
619        def: RouteDefinition,
620    ) -> Result<BoxProcessor, CamelError> {
621        let route_id = def.route_id().to_string();
622
623        let producer_ctx = self.build_producer_context()?;
624
625        let processors_with_contracts =
626            self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
627        let mut pipeline = compose_traced_pipeline_with_contracts(
628            processors_with_contracts,
629            &route_id,
630            self.tracing_enabled,
631            self.tracer_detail_level.clone(),
632            self.tracer_metrics.clone(),
633        );
634
635        if let Some(cb_config) = def.circuit_breaker {
636            let cb_layer = CircuitBreakerLayer::new(cb_config);
637            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
638        }
639
640        let eh_config = def
641            .error_handler
642            .clone()
643            .or_else(|| self.global_error_handler.clone());
644        if let Some(config) = eh_config {
645            let component_ctx = ControllerComponentContext::new(
646                Arc::clone(&self.registry),
647                Arc::clone(&self.languages),
648                self.tracer_metrics
649                    .clone()
650                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
651                Arc::clone(&self.platform_service),
652            );
653            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
654            pipeline = BoxProcessor::new(layer.layer(pipeline));
655        }
656
657        // Apply UoW layer outermost
658        if let Some(uow_config) = &def.unit_of_work {
659            let existing_counter = self
660                .routes
661                .get(&route_id)
662                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
663
664            let component_ctx = ControllerComponentContext::new(
665                Arc::clone(&self.registry),
666                Arc::clone(&self.languages),
667                self.tracer_metrics
668                    .clone()
669                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
670                Arc::clone(&self.platform_service),
671            );
672
673            let (uow_layer, _counter) = self.resolve_uow_layer(
674                uow_config,
675                &producer_ctx,
676                &component_ctx,
677                existing_counter,
678            )?;
679
680            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
681        }
682
683        Ok(pipeline)
684    }
685
686    /// Remove a route from the controller map.
687    ///
688    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
689    /// Returns an error if the route is still running or does not exist.
690    /// Does not cancel any running tasks — call `stop_route` first.
691    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
692        let managed = self.routes.get(route_id).ok_or_else(|| {
693            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
694        })?;
695        if handle_is_running(&managed.consumer_handle)
696            || handle_is_running(&managed.pipeline_handle)
697        {
698            return Err(CamelError::RouteError(format!(
699                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
700                route_id,
701                inferred_lifecycle_label(managed)
702            )));
703        }
704        self.routes.remove(route_id);
705        info!(route_id = %route_id, "Route removed from controller");
706        Ok(())
707    }
708
709    /// Returns the number of routes in the controller.
710    pub fn route_count(&self) -> usize {
711        self.routes.len()
712    }
713
714    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
715        self.routes.get(route_id).map(|r| {
716            r.in_flight
717                .as_ref()
718                .map_or(0, |c| c.load(Ordering::Relaxed))
719        })
720    }
721
722    /// Returns `true` if a route with the given ID exists.
723    pub fn route_exists(&self, route_id: &str) -> bool {
724        self.routes.contains_key(route_id)
725    }
726
727    /// Returns all route IDs.
728    pub fn route_ids(&self) -> Vec<String> {
729        self.routes.keys().cloned().collect()
730    }
731
732    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
733        self.routes
734            .get(route_id)
735            .and_then(|m| m.definition.source_hash())
736    }
737
738    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
739    pub fn auto_startup_route_ids(&self) -> Vec<String> {
740        let mut pairs: Vec<(String, i32)> = self
741            .routes
742            .iter()
743            .filter(|(_, managed)| managed.definition.auto_startup())
744            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
745            .collect();
746        pairs.sort_by_key(|(_, order)| *order);
747        pairs.into_iter().map(|(id, _)| id).collect()
748    }
749
750    /// Returns route IDs sorted by shutdown order (startup order descending).
751    pub fn shutdown_route_ids(&self) -> Vec<String> {
752        let mut pairs: Vec<(String, i32)> = self
753            .routes
754            .iter()
755            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
756            .collect();
757        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
758        pairs.into_iter().map(|(id, _)| id).collect()
759    }
760
761    /// Atomically swap the pipeline of a route.
762    ///
763    /// In-flight requests finish with the old pipeline (kept alive by Arc).
764    /// New requests immediately use the new pipeline.
765    pub fn swap_pipeline(
766        &self,
767        route_id: &str,
768        new_pipeline: BoxProcessor,
769    ) -> Result<(), CamelError> {
770        let managed = self
771            .routes
772            .get(route_id)
773            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
774
775        if managed.aggregate_split.is_some() {
776            tracing::warn!(
777                route_id = %route_id,
778                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
779            );
780        }
781
782        managed
783            .pipeline
784            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
785        info!(route_id = %route_id, "Pipeline swapped atomically");
786        Ok(())
787    }
788
789    /// Returns the from_uri of a route, if it exists.
790    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
791        self.routes.get(route_id).map(|r| r.from_uri.clone())
792    }
793
794    /// Get a clone of the current pipeline for a route.
795    ///
796    /// This is useful for testing and introspection.
797    /// Returns `None` if the route doesn't exist.
798    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
799        self.routes
800            .get(route_id)
801            .map(|r| r.pipeline.load().0.clone())
802    }
803
804    /// Internal stop implementation that can set custom status.
805    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
806        super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
807    }
808
809    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
810        self.start_route(route_id).await
811    }
812
813    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
814        self.stop_route(route_id).await
815    }
816}
817
818#[async_trait::async_trait]
819impl RouteController for DefaultRouteController {
820    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
821        // Check if route exists and can be started.
822        {
823            let managed = self
824                .routes
825                .get_mut(route_id)
826                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
827
828            let consumer_running = handle_is_running(&managed.consumer_handle);
829            let pipeline_running = handle_is_running(&managed.pipeline_handle);
830            if consumer_running && pipeline_running {
831                return Ok(());
832            }
833            if !consumer_running && pipeline_running {
834                return Err(CamelError::RouteError(format!(
835                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
836                    route_id
837                )));
838            }
839            if consumer_running && !pipeline_running {
840                return Err(CamelError::RouteError(format!(
841                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
842                    route_id
843                )));
844            }
845        }
846
847        info!(route_id = %route_id, "Starting route");
848
849        // Get the resolved route info
850        let (from_uri, pipeline, concurrency) = {
851            let managed = self
852                .routes
853                .get(route_id)
854                .expect("invariant: route must exist after prior existence check");
855            (
856                managed.from_uri.clone(),
857                Arc::clone(&managed.pipeline),
858                managed.concurrency.clone(),
859            )
860        };
861
862        // Clone crash notifier for consumer task
863        let crash_notifier = self.crash_notifier.clone();
864        let runtime_for_consumer = self.runtime.clone();
865
866        let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
867            &self.registry,
868            &from_uri,
869            &ControllerComponentContext::new(
870                Arc::clone(&self.registry),
871                Arc::clone(&self.languages),
872                self.tracer_metrics
873                    .clone()
874                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
875                Arc::clone(&self.platform_service),
876            ),
877        )?;
878
879        // Resolve effective concurrency: route override > consumer default
880        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
881
882        // Get the managed route for mutation
883        let managed = self
884            .routes
885            .get_mut(route_id)
886            .expect("invariant: route must exist after prior existence check");
887
888        // Create channel for consumer to send exchanges
889        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
890        // Create child tokens for independent lifecycle control
891        let consumer_cancel = managed.consumer_cancel_token.child_token();
892        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
893        // Clone sender for storage (to reuse on resume)
894        let tx_for_storage = tx.clone();
895        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
896
897        // --- Aggregator v2: check for aggregate route with timeout ---
898        let managed = self
899            .routes
900            .get_mut(route_id)
901            .expect("invariant: route must exist after prior existence check");
902
903        if let Some(split) = managed.aggregate_split.as_ref() {
904            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
905
906            let route_cancel_clone = pipeline_cancel.clone();
907            let svc = AggregatorService::new(
908                split.agg_config.clone(),
909                late_tx,
910                Arc::clone(&self.languages),
911                route_cancel_clone,
912            );
913            let agg = Arc::new(std::sync::Mutex::new(svc));
914
915            managed.agg_service = Some(Arc::clone(&agg));
916
917            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
918            let pre_pipeline = Arc::clone(&split.pre_pipeline);
919            let post_pipeline = Arc::clone(&split.post_pipeline);
920
921            // Spawn consumer task (same as normal route)
922            let consumer_handle = super::consumer_management::spawn_consumer_task(
923                route_id.to_string(),
924                consumer,
925                consumer_ctx,
926                crash_notifier,
927                runtime_for_consumer,
928                false,
929            );
930
931            // Spawn biased select forward loop
932            let pipeline_handle = tokio::spawn(async move {
933                loop {
934                    tokio::select! {
935                        biased;
936
937                        late_ex = async {
938                            let mut rx = late_rx.lock().await;
939                            rx.recv().await
940                        } => {
941                            match late_ex {
942                                Some(ex) => {
943                                    let pipe = post_pipeline.load();
944                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
945                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
946                                    }
947                                }
948                                None => return,
949                            }
950                        }
951
952                        envelope_opt = rx.recv() => {
953                            match envelope_opt {
954                                Some(envelope) => {
955                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
956                                    let pre_pipe = pre_pipeline.load();
957                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
958                                        Ok(ex) => ex,
959                                        Err(e) => {
960                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
961                                            continue;
962                                        }
963                                    };
964
965                                    let ex = {
966                                        let cloned_svc = agg
967                                            .lock()
968                                            .expect("mutex poisoned: another thread panicked while holding this lock")
969                                            .clone();
970                                        cloned_svc.oneshot(ex).await
971                                    };
972
973                                    match ex {
974                                        Ok(ex) => {
975                                            if !is_pending(&ex) {
976                                                let post_pipe = post_pipeline.load();
977                                                let out = post_pipe.0.clone().oneshot(ex).await;
978                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
979                                            } else if let Some(tx) = reply_tx {
980                                                let _ = tx.send(Ok(ex));
981                                            }
982                                        }
983                                        Err(e) => {
984                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
985                                        }
986                                    }
987                                }
988                                None => return,
989                            }
990                        }
991
992                        _ = pipeline_cancel.cancelled() => {
993                            {
994                                let guard = agg
995                                    .lock()
996                                    .expect("mutex poisoned: another thread panicked while holding this lock");
997                                guard.force_complete_all();
998                            }
999                            let mut rx_guard = late_rx.lock().await;
1000                            while let Ok(late_ex) = rx_guard.try_recv() {
1001                                let pipe = post_pipeline.load();
1002                                let _ = pipe.0.clone().oneshot(late_ex).await;
1003                            }
1004                            break;
1005                        }
1006                    }
1007                }
1008            });
1009
1010            let managed = self
1011                .routes
1012                .get_mut(route_id)
1013                .expect("invariant: route must exist");
1014            managed.consumer_handle = Some(consumer_handle);
1015            managed.pipeline_handle = Some(pipeline_handle);
1016            managed.channel_sender = Some(tx_for_storage);
1017
1018            info!(route_id = %route_id, "Route started (aggregate with timeout)");
1019            return Ok(());
1020        }
1021        // --- End aggregator v2 branch ---
1022
1023        // Start consumer in background task.
1024        let consumer_handle = super::consumer_management::spawn_consumer_task(
1025            route_id.to_string(),
1026            consumer,
1027            consumer_ctx,
1028            crash_notifier,
1029            runtime_for_consumer,
1030            false,
1031        );
1032
1033        // Spawn pipeline task with its own cancellation token
1034        let pipeline_handle = match effective_concurrency {
1035            ConcurrencyModel::Sequential => {
1036                tokio::spawn(async move {
1037                    loop {
1038                        // Use select! to exit promptly on cancellation even when idle
1039                        let envelope = tokio::select! {
1040                            envelope = rx.recv() => match envelope {
1041                                Some(e) => e,
1042                                None => return, // Channel closed
1043                            },
1044                            _ = pipeline_cancel.cancelled() => {
1045                                // Cancellation requested - exit gracefully
1046                                return;
1047                            }
1048                        };
1049                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1050
1051                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1052                        let mut pipeline = pipeline.load().0.clone();
1053
1054                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1055                            if let Some(tx) = reply_tx {
1056                                let _ = tx.send(Err(e));
1057                            }
1058                            return;
1059                        }
1060
1061                        let result = pipeline.call(exchange).await;
1062                        if let Some(tx) = reply_tx {
1063                            let _ = tx.send(result);
1064                        } else if let Err(ref e) = result
1065                            && !matches!(e, CamelError::Stopped)
1066                        {
1067                            error!("Pipeline error: {e}");
1068                        }
1069                    }
1070                })
1071            }
1072            ConcurrencyModel::Concurrent { max } => {
1073                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1074                tokio::spawn(async move {
1075                    loop {
1076                        // Use select! to exit promptly on cancellation even when idle
1077                        let envelope = tokio::select! {
1078                            envelope = rx.recv() => match envelope {
1079                                Some(e) => e,
1080                                None => return, // Channel closed
1081                            },
1082                            _ = pipeline_cancel.cancelled() => {
1083                                // Cancellation requested - exit gracefully
1084                                return;
1085                            }
1086                        };
1087                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1088                        let pipe_ref = Arc::clone(&pipeline);
1089                        let sem = sem.clone();
1090                        let cancel = pipeline_cancel.clone();
1091                        tokio::spawn(async move {
1092                            // Acquire semaphore permit if bounded
1093                            let _permit = match &sem {
1094                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1095                                None => None,
1096                            };
1097
1098                            // Load current pipeline from ArcSwap
1099                            let mut pipe = pipe_ref.load().0.clone();
1100
1101                            // Wait for service ready with circuit breaker backoff
1102                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1103                                if let Some(tx) = reply_tx {
1104                                    let _ = tx.send(Err(e));
1105                                }
1106                                return;
1107                            }
1108
1109                            let result = pipe.call(exchange).await;
1110                            if let Some(tx) = reply_tx {
1111                                let _ = tx.send(result);
1112                            } else if let Err(ref e) = result
1113                                && !matches!(e, CamelError::Stopped)
1114                            {
1115                                error!("Pipeline error: {e}");
1116                            }
1117                        });
1118                    }
1119                })
1120            }
1121        };
1122
1123        // Store handles and update status
1124        let managed = self
1125            .routes
1126            .get_mut(route_id)
1127            .expect("invariant: route must exist after prior existence check");
1128        managed.consumer_handle = Some(consumer_handle);
1129        managed.pipeline_handle = Some(pipeline_handle);
1130        managed.channel_sender = Some(tx_for_storage);
1131
1132        info!(route_id = %route_id, "Route started");
1133        Ok(())
1134    }
1135
1136    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1137        self.stop_route_internal(route_id).await
1138    }
1139
1140    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1141        self.stop_route(route_id).await?;
1142        tokio::time::sleep(Duration::from_millis(100)).await;
1143        self.start_route(route_id).await
1144    }
1145
1146    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1147        // Check route exists and state.
1148        let managed = self
1149            .routes
1150            .get_mut(route_id)
1151            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1152
1153        let consumer_running = handle_is_running(&managed.consumer_handle);
1154        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1155
1156        // Can only suspend from active started state.
1157        if !consumer_running || !pipeline_running {
1158            return Err(CamelError::RouteError(format!(
1159                "Cannot suspend route '{}' with execution lifecycle {}",
1160                route_id,
1161                inferred_lifecycle_label(managed)
1162            )));
1163        }
1164
1165        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1166
1167        // Cancel consumer token only (keep pipeline running)
1168        let managed = self
1169            .routes
1170            .get_mut(route_id)
1171            .expect("invariant: route must exist after prior existence check");
1172        managed.consumer_cancel_token.cancel();
1173
1174        // Take and join consumer handle
1175        let managed = self
1176            .routes
1177            .get_mut(route_id)
1178            .expect("invariant: route must exist after prior existence check");
1179        let consumer_handle = managed.consumer_handle.take();
1180
1181        // Wait for consumer task to complete with timeout
1182        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1183            if let Some(handle) = consumer_handle {
1184                let _ = handle.await;
1185            }
1186        })
1187        .await;
1188
1189        if timeout_result.is_err() {
1190            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1191        }
1192
1193        // Get the managed route again (can't hold across await)
1194        let managed = self
1195            .routes
1196            .get_mut(route_id)
1197            .expect("invariant: route must exist after prior existence check");
1198
1199        // Create fresh cancellation token for consumer (for resume)
1200        managed.consumer_cancel_token = CancellationToken::new();
1201
1202        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1203        Ok(())
1204    }
1205
1206    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1207        // Check route exists and is Suspended-equivalent execution state.
1208        let managed = self
1209            .routes
1210            .get(route_id)
1211            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1212
1213        let consumer_running = handle_is_running(&managed.consumer_handle);
1214        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1215        if consumer_running || !pipeline_running {
1216            return Err(CamelError::RouteError(format!(
1217                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1218                route_id,
1219                inferred_lifecycle_label(managed)
1220            )));
1221        }
1222
1223        // Get the stored channel sender (must exist for a suspended route)
1224        let sender = managed.channel_sender.clone().ok_or_else(|| {
1225            CamelError::RouteError("Suspended route has no channel sender".into())
1226        })?;
1227
1228        // Get from_uri and concurrency for creating new consumer
1229        let from_uri = managed.from_uri.clone();
1230
1231        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1232
1233        let (consumer, _) = super::consumer_management::create_route_consumer(
1234            &self.registry,
1235            &from_uri,
1236            &ControllerComponentContext::new(
1237                Arc::clone(&self.registry),
1238                Arc::clone(&self.languages),
1239                self.tracer_metrics
1240                    .clone()
1241                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1242                Arc::clone(&self.platform_service),
1243            ),
1244        )?;
1245
1246        // Get the managed route for mutation
1247        let managed = self
1248            .routes
1249            .get_mut(route_id)
1250            .expect("invariant: route must exist after prior existence check");
1251
1252        // Create child token for consumer lifecycle
1253        let consumer_cancel = managed.consumer_cancel_token.child_token();
1254
1255        let crash_notifier = self.crash_notifier.clone();
1256        let runtime_for_consumer = self.runtime.clone();
1257
1258        // Create ConsumerContext with the stored sender
1259        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1260
1261        // Spawn consumer task
1262        let consumer_handle = super::consumer_management::spawn_consumer_task(
1263            route_id.to_string(),
1264            consumer,
1265            consumer_ctx,
1266            crash_notifier,
1267            runtime_for_consumer,
1268            true,
1269        );
1270
1271        // Store consumer handle and update status
1272        let managed = self
1273            .routes
1274            .get_mut(route_id)
1275            .expect("invariant: route must exist after prior existence check");
1276        managed.consumer_handle = Some(consumer_handle);
1277
1278        info!(route_id = %route_id, "Route resumed");
1279        Ok(())
1280    }
1281
1282    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1283        // Only start routes where auto_startup() == true
1284        // Sort by startup_order() ascending before starting
1285        let route_ids: Vec<String> = {
1286            let mut pairs: Vec<_> = self
1287                .routes
1288                .iter()
1289                .filter(|(_, r)| r.definition.auto_startup())
1290                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1291                .collect();
1292            pairs.sort_by_key(|(_, order)| *order);
1293            pairs.into_iter().map(|(id, _)| id).collect()
1294        };
1295
1296        info!("Starting {} auto-startup routes", route_ids.len());
1297
1298        // Collect errors but continue starting remaining routes
1299        let mut errors: Vec<String> = Vec::new();
1300        for route_id in route_ids {
1301            if let Err(e) = self.start_route(&route_id).await {
1302                errors.push(format!("Route '{}': {}", route_id, e));
1303            }
1304        }
1305
1306        if !errors.is_empty() {
1307            return Err(CamelError::RouteError(format!(
1308                "Failed to start routes: {}",
1309                errors.join(", ")
1310            )));
1311        }
1312
1313        info!("All auto-startup routes started");
1314        Ok(())
1315    }
1316
1317    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1318        // Sort by startup_order descending (reverse order)
1319        let route_ids: Vec<String> = {
1320            let mut pairs: Vec<_> = self
1321                .routes
1322                .iter()
1323                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1324                .collect();
1325            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1326            pairs.into_iter().map(|(id, _)| id).collect()
1327        };
1328
1329        info!("Stopping {} routes", route_ids.len());
1330
1331        for route_id in route_ids {
1332            let _ = self.stop_route(&route_id).await;
1333        }
1334
1335        info!("All routes stopped");
1336        Ok(())
1337    }
1338}
1339
1340#[cfg(test)]
1341#[path = "route_controller_tests.rs"]
1342mod tests;