Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21    BoxProcessor, CamelError, Exchange, IdentityProcessor, LeaderElector, NoOpMetrics,
22    NoopLeaderElector, ProducerContext, RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25    ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35    compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38    BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45/// Notification sent when a route crashes.
46#[derive(Debug, Clone)]
47pub struct CrashNotification {
48    /// The ID of the crashed route.
49    pub route_id: String,
50    /// The error that caused the crash.
51    pub error: String,
52}
53
54/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
55///
56/// # Safety
57///
58/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
59/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
60///
61/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
62///    (creates a new boxed service from the inner clone).
63/// 2. The clone is owned by the calling thread and never shared.
64/// 3. ArcSwap guarantees we only get & references (no &mut).
65///
66/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
67/// clone() does not mutate shared state and each thread gets an independent copy.
68pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73/// Internal state for a managed route.
74pub(super) struct AggregateSplitInfo {
75    pub(super) pre_pipeline: SharedPipeline,
76    pub(super) agg_config: AggregatorConfig,
77    pub(super) post_pipeline: SharedPipeline,
78}
79
80pub(super) struct ManagedRoute {
81    /// The route definition metadata (for introspection).
82    pub(super) definition: RouteDefinitionInfo,
83    /// Source endpoint URI.
84    pub(super) from_uri: String,
85    /// Resolved processor pipeline (wrapped for atomic swap).
86    pub(super) pipeline: SharedPipeline,
87    /// Concurrency model override (if any).
88    pub(super) concurrency: Option<ConcurrencyModel>,
89    /// Handle for the consumer task (if running).
90    pub(super) consumer_handle: Option<JoinHandle<()>>,
91    /// Handle for the pipeline task (if running).
92    pub(super) pipeline_handle: Option<JoinHandle<()>>,
93    /// Cancellation token for stopping the consumer task.
94    /// This allows independent control of the consumer lifecycle (for suspend/resume).
95    pub(super) consumer_cancel_token: CancellationToken,
96    /// Cancellation token for stopping the pipeline task.
97    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
98    pub(super) pipeline_cancel_token: CancellationToken,
99    /// Channel sender for sending exchanges to the pipeline.
100    /// Stored to allow resuming a suspended route without recreating the channel.
101    pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
102    /// In-flight exchange counter. `None` when UoW is not configured for this route.
103    pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
104    pub(super) aggregate_split: Option<AggregateSplitInfo>,
105    pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
106}
107
108pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
109    handle.as_ref().is_some_and(|h| !h.is_finished())
110}
111
112fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
113    match (
114        handle_is_running(&managed.consumer_handle),
115        handle_is_running(&managed.pipeline_handle),
116    ) {
117        (true, true) => "Started",
118        (false, true) => "Suspended",
119        (true, false) => "Stopping",
120        (false, false) => "Stopped",
121    }
122}
123
124fn find_top_level_aggregate_with_timeout(
125    steps: &[BuilderStep],
126) -> Option<(usize, AggregatorConfig)> {
127    for (i, step) in steps.iter().enumerate() {
128        if let BuilderStep::Aggregate { config } = step {
129            if has_timeout_condition(&config.completion) {
130                return Some((i, config.clone()));
131            }
132            break;
133        }
134    }
135    None
136}
137
138pub(crate) struct ControllerComponentContext {
139    registry: Arc<std::sync::Mutex<Registry>>,
140    languages: SharedLanguageRegistry,
141    metrics: Arc<dyn MetricsCollector>,
142    leader_elector: Arc<dyn LeaderElector>,
143}
144
145impl ControllerComponentContext {
146    pub(crate) fn new(
147        registry: Arc<std::sync::Mutex<Registry>>,
148        languages: SharedLanguageRegistry,
149        metrics: Arc<dyn MetricsCollector>,
150        leader_elector: Arc<dyn LeaderElector>,
151    ) -> Self {
152        Self {
153            registry,
154            languages,
155            metrics,
156            leader_elector,
157        }
158    }
159}
160
161impl ComponentContext for ControllerComponentContext {
162    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
163        self.registry.lock().ok()?.get(scheme)
164    }
165
166    fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
167        self.languages.lock().ok()?.get(name).cloned()
168    }
169
170    fn metrics(&self) -> Arc<dyn MetricsCollector> {
171        Arc::clone(&self.metrics)
172    }
173
174    fn leader_elector(&self) -> Arc<dyn LeaderElector> {
175        Arc::clone(&self.leader_elector)
176    }
177}
178
179fn is_pending(ex: &Exchange) -> bool {
180    ex.property("CamelAggregatorPending")
181        .and_then(|v| v.as_bool())
182        .unwrap_or(false)
183}
184
185/// Wait for a pipeline service to be ready with circuit breaker backoff.
186///
187/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
188/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
189/// cancellation checks. It returns `Ok(())` when the service is ready, or
190/// `Err(e)` if cancellation occurred or a fatal error was encountered.
191async fn ready_with_backoff(
192    pipeline: &mut BoxProcessor,
193    cancel: &CancellationToken,
194) -> Result<(), CamelError> {
195    loop {
196        match pipeline.ready().await {
197            Ok(_) => return Ok(()),
198            Err(CamelError::CircuitOpen(ref msg)) => {
199                warn!("Circuit open, backing off: {msg}");
200                tokio::select! {
201                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
202                        continue;
203                    }
204                    _ = cancel.cancelled() => {
205                        // Shutting down — don't retry.
206                        return Err(CamelError::CircuitOpen(msg.clone()));
207                    }
208                }
209            }
210            Err(e) => {
211                error!("Pipeline not ready: {e}");
212                return Err(e);
213            }
214        }
215    }
216}
217
218fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
219    let stamp = std::time::SystemTime::now()
220        .duration_since(std::time::UNIX_EPOCH)
221        .unwrap_or_default()
222        .as_nanos();
223    RuntimeCommand::FailRoute {
224        route_id: route_id.to_string(),
225        error: error.to_string(),
226        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
227        causation_id: None,
228    }
229}
230
231pub(super) async fn publish_runtime_failure(
232    runtime: Option<Weak<dyn RuntimeHandle>>,
233    route_id: &str,
234    error: &str,
235) {
236    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
237        return;
238    };
239    let command = runtime_failure_command(route_id, error);
240    if let Err(runtime_error) = runtime.execute(command).await {
241        warn!(
242            route_id = %route_id,
243            error = %runtime_error,
244            "failed to synchronize route crash with runtime projection"
245        );
246    }
247}
248
249/// Default implementation of [`RouteController`].
250///
251/// Manages route lifecycle with support for:
252/// - Starting/stopping individual routes
253/// - Suspending and resuming routes
254/// - Auto-startup with startup ordering
255/// - Graceful shutdown
256pub struct DefaultRouteController {
257    /// Routes indexed by route ID.
258    routes: HashMap<String, ManagedRoute>,
259    /// Reference to the component registry for resolving endpoints.
260    registry: Arc<std::sync::Mutex<Registry>>,
261    /// Shared language registry for resolving declarative language expressions.
262    languages: SharedLanguageRegistry,
263    /// Bean registry for bean method invocation.
264    beans: Arc<std::sync::Mutex<BeanRegistry>>,
265    /// Runtime handle injected into ProducerContext for command/query operations.
266    runtime: Option<Weak<dyn RuntimeHandle>>,
267    /// Optional global error handler applied to all routes without a per-route handler.
268    global_error_handler: Option<ErrorHandlerConfig>,
269    /// Optional crash notifier for supervision.
270    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
271    /// Whether tracing is enabled for route pipelines.
272    tracing_enabled: bool,
273    /// Detail level for tracing when enabled.
274    tracer_detail_level: DetailLevel,
275    /// Metrics collector for tracing processor.
276    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
277    leader_elector: Arc<dyn LeaderElector>,
278}
279
280impl DefaultRouteController {
281    /// Create a new `DefaultRouteController` with the given registry.
282    pub fn new(
283        registry: Arc<std::sync::Mutex<Registry>>,
284        leader_elector: Arc<dyn LeaderElector>,
285    ) -> Self {
286        Self::with_beans_and_leader(
287            registry,
288            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
289            leader_elector,
290        )
291    }
292
293    /// Create a new `DefaultRouteController` with shared bean registry.
294    pub fn with_beans(
295        registry: Arc<std::sync::Mutex<Registry>>,
296        beans: Arc<std::sync::Mutex<BeanRegistry>>,
297    ) -> Self {
298        Self::with_beans_and_leader(registry, beans, Arc::new(NoopLeaderElector))
299    }
300
301    fn with_beans_and_leader(
302        registry: Arc<std::sync::Mutex<Registry>>,
303        beans: Arc<std::sync::Mutex<BeanRegistry>>,
304        leader_elector: Arc<dyn LeaderElector>,
305    ) -> Self {
306        Self {
307            routes: HashMap::new(),
308            registry,
309            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
310            beans,
311            runtime: None,
312            global_error_handler: None,
313            crash_notifier: None,
314            tracing_enabled: false,
315            tracer_detail_level: DetailLevel::Minimal,
316            tracer_metrics: None,
317            leader_elector,
318        }
319    }
320
321    /// Create a new `DefaultRouteController` with shared language registry.
322    pub fn with_languages(
323        registry: Arc<std::sync::Mutex<Registry>>,
324        languages: SharedLanguageRegistry,
325        leader_elector: Arc<dyn LeaderElector>,
326    ) -> Self {
327        Self {
328            routes: HashMap::new(),
329            registry,
330            languages,
331            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
332            runtime: None,
333            global_error_handler: None,
334            crash_notifier: None,
335            tracing_enabled: false,
336            tracer_detail_level: DetailLevel::Minimal,
337            tracer_metrics: None,
338            leader_elector,
339        }
340    }
341
342    /// Set runtime handle for ProducerContext creation.
343    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
344        self.runtime = Some(Arc::downgrade(&runtime));
345    }
346
347    /// Set the crash notifier for supervision.
348    ///
349    /// When set, the controller will send a [`CrashNotification`] whenever
350    /// a consumer crashes.
351    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
352        self.crash_notifier = Some(tx);
353    }
354
355    /// Set a global error handler applied to all routes without a per-route handler.
356    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
357        self.global_error_handler = Some(config);
358    }
359
360    /// Configure tracing for this route controller.
361    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
362        self.tracing_enabled = config.enabled;
363        self.tracer_detail_level = config.detail_level.clone();
364        self.tracer_metrics = config.metrics_collector.clone();
365    }
366
367    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
368        let mut producer_ctx = ProducerContext::new();
369        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
370            producer_ctx = producer_ctx.with_runtime(runtime);
371        }
372        Ok(producer_ctx)
373    }
374
375    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
376    fn resolve_error_handler(
377        &self,
378        config: ErrorHandlerConfig,
379        producer_ctx: &ProducerContext,
380        component_ctx: &dyn ComponentContext,
381    ) -> Result<ErrorHandlerLayer, CamelError> {
382        // Resolve DLC URI → producer.
383        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
384            let parsed = parse_uri(uri)?;
385            let component = component_ctx
386                .resolve_component(&parsed.scheme)
387                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
388            let endpoint = component.create_endpoint(uri, component_ctx)?;
389            Some(endpoint.create_producer(producer_ctx)?)
390        } else {
391            None
392        };
393
394        // Resolve per-policy `handled_by` URIs.
395        let mut resolved_policies = Vec::new();
396        for policy in config.policies {
397            let handler_producer = if let Some(ref uri) = policy.handled_by {
398                let parsed = parse_uri(uri)?;
399                let component = component_ctx
400                    .resolve_component(&parsed.scheme)
401                    .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
402                let endpoint = component.create_endpoint(uri, component_ctx)?;
403                Some(endpoint.create_producer(producer_ctx)?)
404            } else {
405                None
406            };
407            resolved_policies.push((policy, handler_producer));
408        }
409
410        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
411    }
412
413    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
414    /// Returns `Err` if any hook URI cannot be resolved.
415    fn resolve_uow_layer(
416        &self,
417        config: &UnitOfWorkConfig,
418        producer_ctx: &ProducerContext,
419        component_ctx: &dyn ComponentContext,
420        counter: Option<Arc<AtomicU64>>,
421    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
422        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
423            let parsed = parse_uri(uri)?;
424            let component = component_ctx
425                .resolve_component(&parsed.scheme)
426                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
427            let endpoint = component.create_endpoint(uri, component_ctx)?;
428            endpoint.create_producer(producer_ctx).map_err(|e| {
429                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
430            })
431        };
432
433        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
434        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
435
436        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
437        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
438        Ok((layer, counter))
439    }
440
441    /// Resolve BuilderSteps into BoxProcessors.
442    pub(crate) fn resolve_steps(
443        &self,
444        steps: Vec<BuilderStep>,
445        producer_ctx: &ProducerContext,
446        registry: &Arc<std::sync::Mutex<Registry>>,
447    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
448        let component_ctx = Arc::new(ControllerComponentContext::new(
449            Arc::clone(registry),
450            Arc::clone(&self.languages),
451            self.tracer_metrics
452                .clone()
453                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
454            Arc::clone(&self.leader_elector),
455        ));
456
457        super::step_resolution::resolve_steps(
458            steps,
459            producer_ctx,
460            registry,
461            &self.languages,
462            &self.beans,
463            component_ctx,
464        )
465    }
466
467    /// Add a route definition to the controller.
468    ///
469    /// Steps are resolved immediately using the registry.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if:
474    /// - A route with the same ID already exists
475    /// - Step resolution fails
476    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
477        let route_id = definition.route_id().to_string();
478
479        if self.routes.contains_key(&route_id) {
480            return Err(CamelError::RouteError(format!(
481                "Route '{}' already exists",
482                route_id
483            )));
484        }
485
486        info!(route_id = %route_id, "Adding route to controller");
487
488        // Extract definition info for storage before steps are consumed
489        let definition_info = definition.to_info();
490        let RouteDefinition {
491            from_uri,
492            steps,
493            error_handler,
494            circuit_breaker,
495            unit_of_work,
496            concurrency,
497            ..
498        } = definition;
499
500        // Create ProducerContext from self_ref for step resolution
501        let producer_ctx = self.build_producer_context()?;
502
503        // Take ownership of steps before resolve_steps consumes them
504        let mut aggregate_split: Option<AggregateSplitInfo> = None;
505        let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
506            Some((idx, agg_config)) => {
507                let mut pre_steps = steps;
508                let mut rest = pre_steps.split_off(idx);
509                let _agg_step = rest.remove(0);
510                let post_steps = rest;
511
512                let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
513                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
514                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
515                    compose_pipeline(pre_procs),
516                )));
517
518                let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
519                let post_procs: Vec<BoxProcessor> =
520                    post_pairs.into_iter().map(|(p, _)| p).collect();
521                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
522                    compose_pipeline(post_procs),
523                )));
524
525                aggregate_split = Some(AggregateSplitInfo {
526                    pre_pipeline,
527                    agg_config,
528                    post_pipeline,
529                });
530
531                vec![]
532            }
533            None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
534        };
535        let route_id_for_tracing = route_id.clone();
536        let mut pipeline = if processors_with_contracts.is_empty() {
537            BoxProcessor::new(IdentityProcessor)
538        } else {
539            compose_traced_pipeline_with_contracts(
540                processors_with_contracts,
541                &route_id_for_tracing,
542                self.tracing_enabled,
543                self.tracer_detail_level.clone(),
544                self.tracer_metrics.clone(),
545            )
546        };
547
548        // Apply circuit breaker if configured
549        if let Some(cb_config) = circuit_breaker {
550            let cb_layer = CircuitBreakerLayer::new(cb_config);
551            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
552        }
553
554        // Determine which error handler config to use (per-route takes precedence)
555        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
556
557        if let Some(config) = eh_config {
558            let component_ctx = ControllerComponentContext::new(
559                Arc::clone(&self.registry),
560                Arc::clone(&self.languages),
561                self.tracer_metrics
562                    .clone()
563                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
564                Arc::clone(&self.leader_elector),
565            );
566            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
567            pipeline = BoxProcessor::new(layer.layer(pipeline));
568        }
569
570        // Apply UoW layer outermost (after error handler)
571        let uow_counter = if let Some(uow_config) = &unit_of_work {
572            let component_ctx = ControllerComponentContext::new(
573                Arc::clone(&self.registry),
574                Arc::clone(&self.languages),
575                self.tracer_metrics
576                    .clone()
577                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
578                Arc::clone(&self.leader_elector),
579            );
580            let (uow_layer, counter) =
581                self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
582            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
583            Some(counter)
584        } else {
585            None
586        };
587
588        self.routes.insert(
589            route_id.clone(),
590            ManagedRoute {
591                definition: definition_info,
592                from_uri,
593                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
594                concurrency,
595                consumer_handle: None,
596                pipeline_handle: None,
597                consumer_cancel_token: CancellationToken::new(),
598                pipeline_cancel_token: CancellationToken::new(),
599                channel_sender: None,
600                in_flight: uow_counter,
601                aggregate_split,
602                agg_service: None,
603            },
604        );
605
606        Ok(())
607    }
608
609    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
610    ///
611    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
612    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
613    pub fn compile_route_definition(
614        &self,
615        def: RouteDefinition,
616    ) -> Result<BoxProcessor, CamelError> {
617        let route_id = def.route_id().to_string();
618
619        let producer_ctx = self.build_producer_context()?;
620
621        let processors_with_contracts =
622            self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
623        let mut pipeline = compose_traced_pipeline_with_contracts(
624            processors_with_contracts,
625            &route_id,
626            self.tracing_enabled,
627            self.tracer_detail_level.clone(),
628            self.tracer_metrics.clone(),
629        );
630
631        if let Some(cb_config) = def.circuit_breaker {
632            let cb_layer = CircuitBreakerLayer::new(cb_config);
633            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
634        }
635
636        let eh_config = def
637            .error_handler
638            .clone()
639            .or_else(|| self.global_error_handler.clone());
640        if let Some(config) = eh_config {
641            let component_ctx = ControllerComponentContext::new(
642                Arc::clone(&self.registry),
643                Arc::clone(&self.languages),
644                self.tracer_metrics
645                    .clone()
646                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
647                Arc::clone(&self.leader_elector),
648            );
649            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
650            pipeline = BoxProcessor::new(layer.layer(pipeline));
651        }
652
653        // Apply UoW layer outermost
654        if let Some(uow_config) = &def.unit_of_work {
655            let existing_counter = self
656                .routes
657                .get(&route_id)
658                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
659
660            let component_ctx = ControllerComponentContext::new(
661                Arc::clone(&self.registry),
662                Arc::clone(&self.languages),
663                self.tracer_metrics
664                    .clone()
665                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
666                Arc::clone(&self.leader_elector),
667            );
668
669            let (uow_layer, _counter) = self.resolve_uow_layer(
670                uow_config,
671                &producer_ctx,
672                &component_ctx,
673                existing_counter,
674            )?;
675
676            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
677        }
678
679        Ok(pipeline)
680    }
681
682    /// Remove a route from the controller map.
683    ///
684    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
685    /// Returns an error if the route is still running or does not exist.
686    /// Does not cancel any running tasks — call `stop_route` first.
687    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
688        let managed = self.routes.get(route_id).ok_or_else(|| {
689            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
690        })?;
691        if handle_is_running(&managed.consumer_handle)
692            || handle_is_running(&managed.pipeline_handle)
693        {
694            return Err(CamelError::RouteError(format!(
695                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
696                route_id,
697                inferred_lifecycle_label(managed)
698            )));
699        }
700        self.routes.remove(route_id);
701        info!(route_id = %route_id, "Route removed from controller");
702        Ok(())
703    }
704
705    /// Returns the number of routes in the controller.
706    pub fn route_count(&self) -> usize {
707        self.routes.len()
708    }
709
710    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
711        self.routes.get(route_id).map(|r| {
712            r.in_flight
713                .as_ref()
714                .map_or(0, |c| c.load(Ordering::Relaxed))
715        })
716    }
717
718    /// Returns `true` if a route with the given ID exists.
719    pub fn route_exists(&self, route_id: &str) -> bool {
720        self.routes.contains_key(route_id)
721    }
722
723    /// Returns all route IDs.
724    pub fn route_ids(&self) -> Vec<String> {
725        self.routes.keys().cloned().collect()
726    }
727
728    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
729        self.routes
730            .get(route_id)
731            .and_then(|m| m.definition.source_hash())
732    }
733
734    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
735    pub fn auto_startup_route_ids(&self) -> Vec<String> {
736        let mut pairs: Vec<(String, i32)> = self
737            .routes
738            .iter()
739            .filter(|(_, managed)| managed.definition.auto_startup())
740            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
741            .collect();
742        pairs.sort_by_key(|(_, order)| *order);
743        pairs.into_iter().map(|(id, _)| id).collect()
744    }
745
746    /// Returns route IDs sorted by shutdown order (startup order descending).
747    pub fn shutdown_route_ids(&self) -> Vec<String> {
748        let mut pairs: Vec<(String, i32)> = self
749            .routes
750            .iter()
751            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
752            .collect();
753        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
754        pairs.into_iter().map(|(id, _)| id).collect()
755    }
756
757    /// Atomically swap the pipeline of a route.
758    ///
759    /// In-flight requests finish with the old pipeline (kept alive by Arc).
760    /// New requests immediately use the new pipeline.
761    pub fn swap_pipeline(
762        &self,
763        route_id: &str,
764        new_pipeline: BoxProcessor,
765    ) -> Result<(), CamelError> {
766        let managed = self
767            .routes
768            .get(route_id)
769            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
770
771        if managed.aggregate_split.is_some() {
772            tracing::warn!(
773                route_id = %route_id,
774                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
775            );
776        }
777
778        managed
779            .pipeline
780            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
781        info!(route_id = %route_id, "Pipeline swapped atomically");
782        Ok(())
783    }
784
785    /// Returns the from_uri of a route, if it exists.
786    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
787        self.routes.get(route_id).map(|r| r.from_uri.clone())
788    }
789
790    /// Get a clone of the current pipeline for a route.
791    ///
792    /// This is useful for testing and introspection.
793    /// Returns `None` if the route doesn't exist.
794    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
795        self.routes
796            .get(route_id)
797            .map(|r| r.pipeline.load().0.clone())
798    }
799
800    /// Internal stop implementation that can set custom status.
801    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
802        super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
803    }
804
805    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
806        self.start_route(route_id).await
807    }
808
809    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
810        self.stop_route(route_id).await
811    }
812}
813
814#[async_trait::async_trait]
815impl RouteController for DefaultRouteController {
816    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
817        // Check if route exists and can be started.
818        {
819            let managed = self
820                .routes
821                .get_mut(route_id)
822                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
823
824            let consumer_running = handle_is_running(&managed.consumer_handle);
825            let pipeline_running = handle_is_running(&managed.pipeline_handle);
826            if consumer_running && pipeline_running {
827                return Ok(());
828            }
829            if !consumer_running && pipeline_running {
830                return Err(CamelError::RouteError(format!(
831                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
832                    route_id
833                )));
834            }
835            if consumer_running && !pipeline_running {
836                return Err(CamelError::RouteError(format!(
837                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
838                    route_id
839                )));
840            }
841        }
842
843        info!(route_id = %route_id, "Starting route");
844
845        // Get the resolved route info
846        let (from_uri, pipeline, concurrency) = {
847            let managed = self
848                .routes
849                .get(route_id)
850                .expect("invariant: route must exist after prior existence check");
851            (
852                managed.from_uri.clone(),
853                Arc::clone(&managed.pipeline),
854                managed.concurrency.clone(),
855            )
856        };
857
858        // Clone crash notifier for consumer task
859        let crash_notifier = self.crash_notifier.clone();
860        let runtime_for_consumer = self.runtime.clone();
861
862        let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
863            &self.registry,
864            &from_uri,
865            &ControllerComponentContext::new(
866                Arc::clone(&self.registry),
867                Arc::clone(&self.languages),
868                self.tracer_metrics
869                    .clone()
870                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
871                Arc::clone(&self.leader_elector),
872            ),
873        )?;
874
875        // Resolve effective concurrency: route override > consumer default
876        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
877
878        // Get the managed route for mutation
879        let managed = self
880            .routes
881            .get_mut(route_id)
882            .expect("invariant: route must exist after prior existence check");
883
884        // Create channel for consumer to send exchanges
885        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
886        // Create child tokens for independent lifecycle control
887        let consumer_cancel = managed.consumer_cancel_token.child_token();
888        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
889        // Clone sender for storage (to reuse on resume)
890        let tx_for_storage = tx.clone();
891        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
892
893        // --- Aggregator v2: check for aggregate route with timeout ---
894        let managed = self
895            .routes
896            .get_mut(route_id)
897            .expect("invariant: route must exist after prior existence check");
898
899        if let Some(split) = managed.aggregate_split.as_ref() {
900            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
901
902            let route_cancel_clone = pipeline_cancel.clone();
903            let svc = AggregatorService::new(
904                split.agg_config.clone(),
905                late_tx,
906                Arc::clone(&self.languages),
907                route_cancel_clone,
908            );
909            let agg = Arc::new(std::sync::Mutex::new(svc));
910
911            managed.agg_service = Some(Arc::clone(&agg));
912
913            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
914            let pre_pipeline = Arc::clone(&split.pre_pipeline);
915            let post_pipeline = Arc::clone(&split.post_pipeline);
916
917            // Spawn consumer task (same as normal route)
918            let consumer_handle = super::consumer_management::spawn_consumer_task(
919                route_id.to_string(),
920                consumer,
921                consumer_ctx,
922                crash_notifier,
923                runtime_for_consumer,
924                false,
925            );
926
927            // Spawn biased select forward loop
928            let pipeline_handle = tokio::spawn(async move {
929                loop {
930                    tokio::select! {
931                        biased;
932
933                        late_ex = async {
934                            let mut rx = late_rx.lock().await;
935                            rx.recv().await
936                        } => {
937                            match late_ex {
938                                Some(ex) => {
939                                    let pipe = post_pipeline.load();
940                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
941                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
942                                    }
943                                }
944                                None => return,
945                            }
946                        }
947
948                        envelope_opt = rx.recv() => {
949                            match envelope_opt {
950                                Some(envelope) => {
951                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
952                                    let pre_pipe = pre_pipeline.load();
953                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
954                                        Ok(ex) => ex,
955                                        Err(e) => {
956                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
957                                            continue;
958                                        }
959                                    };
960
961                                    let ex = {
962                                        let cloned_svc = agg
963                                            .lock()
964                                            .expect("mutex poisoned: another thread panicked while holding this lock")
965                                            .clone();
966                                        cloned_svc.oneshot(ex).await
967                                    };
968
969                                    match ex {
970                                        Ok(ex) => {
971                                            if !is_pending(&ex) {
972                                                let post_pipe = post_pipeline.load();
973                                                let out = post_pipe.0.clone().oneshot(ex).await;
974                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
975                                            } else if let Some(tx) = reply_tx {
976                                                let _ = tx.send(Ok(ex));
977                                            }
978                                        }
979                                        Err(e) => {
980                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
981                                        }
982                                    }
983                                }
984                                None => return,
985                            }
986                        }
987
988                        _ = pipeline_cancel.cancelled() => {
989                            {
990                                let guard = agg
991                                    .lock()
992                                    .expect("mutex poisoned: another thread panicked while holding this lock");
993                                guard.force_complete_all();
994                            }
995                            let mut rx_guard = late_rx.lock().await;
996                            while let Ok(late_ex) = rx_guard.try_recv() {
997                                let pipe = post_pipeline.load();
998                                let _ = pipe.0.clone().oneshot(late_ex).await;
999                            }
1000                            break;
1001                        }
1002                    }
1003                }
1004            });
1005
1006            let managed = self
1007                .routes
1008                .get_mut(route_id)
1009                .expect("invariant: route must exist");
1010            managed.consumer_handle = Some(consumer_handle);
1011            managed.pipeline_handle = Some(pipeline_handle);
1012            managed.channel_sender = Some(tx_for_storage);
1013
1014            info!(route_id = %route_id, "Route started (aggregate with timeout)");
1015            return Ok(());
1016        }
1017        // --- End aggregator v2 branch ---
1018
1019        // Start consumer in background task.
1020        let consumer_handle = super::consumer_management::spawn_consumer_task(
1021            route_id.to_string(),
1022            consumer,
1023            consumer_ctx,
1024            crash_notifier,
1025            runtime_for_consumer,
1026            false,
1027        );
1028
1029        // Spawn pipeline task with its own cancellation token
1030        let pipeline_handle = match effective_concurrency {
1031            ConcurrencyModel::Sequential => {
1032                tokio::spawn(async move {
1033                    loop {
1034                        // Use select! to exit promptly on cancellation even when idle
1035                        let envelope = tokio::select! {
1036                            envelope = rx.recv() => match envelope {
1037                                Some(e) => e,
1038                                None => return, // Channel closed
1039                            },
1040                            _ = pipeline_cancel.cancelled() => {
1041                                // Cancellation requested - exit gracefully
1042                                return;
1043                            }
1044                        };
1045                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1046
1047                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1048                        let mut pipeline = pipeline.load().0.clone();
1049
1050                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1051                            if let Some(tx) = reply_tx {
1052                                let _ = tx.send(Err(e));
1053                            }
1054                            return;
1055                        }
1056
1057                        let result = pipeline.call(exchange).await;
1058                        if let Some(tx) = reply_tx {
1059                            let _ = tx.send(result);
1060                        } else if let Err(ref e) = result
1061                            && !matches!(e, CamelError::Stopped)
1062                        {
1063                            error!("Pipeline error: {e}");
1064                        }
1065                    }
1066                })
1067            }
1068            ConcurrencyModel::Concurrent { max } => {
1069                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1070                tokio::spawn(async move {
1071                    loop {
1072                        // Use select! to exit promptly on cancellation even when idle
1073                        let envelope = tokio::select! {
1074                            envelope = rx.recv() => match envelope {
1075                                Some(e) => e,
1076                                None => return, // Channel closed
1077                            },
1078                            _ = pipeline_cancel.cancelled() => {
1079                                // Cancellation requested - exit gracefully
1080                                return;
1081                            }
1082                        };
1083                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1084                        let pipe_ref = Arc::clone(&pipeline);
1085                        let sem = sem.clone();
1086                        let cancel = pipeline_cancel.clone();
1087                        tokio::spawn(async move {
1088                            // Acquire semaphore permit if bounded
1089                            let _permit = match &sem {
1090                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1091                                None => None,
1092                            };
1093
1094                            // Load current pipeline from ArcSwap
1095                            let mut pipe = pipe_ref.load().0.clone();
1096
1097                            // Wait for service ready with circuit breaker backoff
1098                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1099                                if let Some(tx) = reply_tx {
1100                                    let _ = tx.send(Err(e));
1101                                }
1102                                return;
1103                            }
1104
1105                            let result = pipe.call(exchange).await;
1106                            if let Some(tx) = reply_tx {
1107                                let _ = tx.send(result);
1108                            } else if let Err(ref e) = result
1109                                && !matches!(e, CamelError::Stopped)
1110                            {
1111                                error!("Pipeline error: {e}");
1112                            }
1113                        });
1114                    }
1115                })
1116            }
1117        };
1118
1119        // Store handles and update status
1120        let managed = self
1121            .routes
1122            .get_mut(route_id)
1123            .expect("invariant: route must exist after prior existence check");
1124        managed.consumer_handle = Some(consumer_handle);
1125        managed.pipeline_handle = Some(pipeline_handle);
1126        managed.channel_sender = Some(tx_for_storage);
1127
1128        info!(route_id = %route_id, "Route started");
1129        Ok(())
1130    }
1131
1132    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1133        self.stop_route_internal(route_id).await
1134    }
1135
1136    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1137        self.stop_route(route_id).await?;
1138        tokio::time::sleep(Duration::from_millis(100)).await;
1139        self.start_route(route_id).await
1140    }
1141
1142    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1143        // Check route exists and state.
1144        let managed = self
1145            .routes
1146            .get_mut(route_id)
1147            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1148
1149        let consumer_running = handle_is_running(&managed.consumer_handle);
1150        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1151
1152        // Can only suspend from active started state.
1153        if !consumer_running || !pipeline_running {
1154            return Err(CamelError::RouteError(format!(
1155                "Cannot suspend route '{}' with execution lifecycle {}",
1156                route_id,
1157                inferred_lifecycle_label(managed)
1158            )));
1159        }
1160
1161        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1162
1163        // Cancel consumer token only (keep pipeline running)
1164        let managed = self
1165            .routes
1166            .get_mut(route_id)
1167            .expect("invariant: route must exist after prior existence check");
1168        managed.consumer_cancel_token.cancel();
1169
1170        // Take and join consumer handle
1171        let managed = self
1172            .routes
1173            .get_mut(route_id)
1174            .expect("invariant: route must exist after prior existence check");
1175        let consumer_handle = managed.consumer_handle.take();
1176
1177        // Wait for consumer task to complete with timeout
1178        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1179            if let Some(handle) = consumer_handle {
1180                let _ = handle.await;
1181            }
1182        })
1183        .await;
1184
1185        if timeout_result.is_err() {
1186            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1187        }
1188
1189        // Get the managed route again (can't hold across await)
1190        let managed = self
1191            .routes
1192            .get_mut(route_id)
1193            .expect("invariant: route must exist after prior existence check");
1194
1195        // Create fresh cancellation token for consumer (for resume)
1196        managed.consumer_cancel_token = CancellationToken::new();
1197
1198        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1199        Ok(())
1200    }
1201
1202    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1203        // Check route exists and is Suspended-equivalent execution state.
1204        let managed = self
1205            .routes
1206            .get(route_id)
1207            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1208
1209        let consumer_running = handle_is_running(&managed.consumer_handle);
1210        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1211        if consumer_running || !pipeline_running {
1212            return Err(CamelError::RouteError(format!(
1213                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1214                route_id,
1215                inferred_lifecycle_label(managed)
1216            )));
1217        }
1218
1219        // Get the stored channel sender (must exist for a suspended route)
1220        let sender = managed.channel_sender.clone().ok_or_else(|| {
1221            CamelError::RouteError("Suspended route has no channel sender".into())
1222        })?;
1223
1224        // Get from_uri and concurrency for creating new consumer
1225        let from_uri = managed.from_uri.clone();
1226
1227        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1228
1229        let (consumer, _) = super::consumer_management::create_route_consumer(
1230            &self.registry,
1231            &from_uri,
1232            &ControllerComponentContext::new(
1233                Arc::clone(&self.registry),
1234                Arc::clone(&self.languages),
1235                self.tracer_metrics
1236                    .clone()
1237                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1238                Arc::clone(&self.leader_elector),
1239            ),
1240        )?;
1241
1242        // Get the managed route for mutation
1243        let managed = self
1244            .routes
1245            .get_mut(route_id)
1246            .expect("invariant: route must exist after prior existence check");
1247
1248        // Create child token for consumer lifecycle
1249        let consumer_cancel = managed.consumer_cancel_token.child_token();
1250
1251        let crash_notifier = self.crash_notifier.clone();
1252        let runtime_for_consumer = self.runtime.clone();
1253
1254        // Create ConsumerContext with the stored sender
1255        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1256
1257        // Spawn consumer task
1258        let consumer_handle = super::consumer_management::spawn_consumer_task(
1259            route_id.to_string(),
1260            consumer,
1261            consumer_ctx,
1262            crash_notifier,
1263            runtime_for_consumer,
1264            true,
1265        );
1266
1267        // Store consumer handle and update status
1268        let managed = self
1269            .routes
1270            .get_mut(route_id)
1271            .expect("invariant: route must exist after prior existence check");
1272        managed.consumer_handle = Some(consumer_handle);
1273
1274        info!(route_id = %route_id, "Route resumed");
1275        Ok(())
1276    }
1277
1278    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1279        // Only start routes where auto_startup() == true
1280        // Sort by startup_order() ascending before starting
1281        let route_ids: Vec<String> = {
1282            let mut pairs: Vec<_> = self
1283                .routes
1284                .iter()
1285                .filter(|(_, r)| r.definition.auto_startup())
1286                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1287                .collect();
1288            pairs.sort_by_key(|(_, order)| *order);
1289            pairs.into_iter().map(|(id, _)| id).collect()
1290        };
1291
1292        info!("Starting {} auto-startup routes", route_ids.len());
1293
1294        // Collect errors but continue starting remaining routes
1295        let mut errors: Vec<String> = Vec::new();
1296        for route_id in route_ids {
1297            if let Err(e) = self.start_route(&route_id).await {
1298                errors.push(format!("Route '{}': {}", route_id, e));
1299            }
1300        }
1301
1302        if !errors.is_empty() {
1303            return Err(CamelError::RouteError(format!(
1304                "Failed to start routes: {}",
1305                errors.join(", ")
1306            )));
1307        }
1308
1309        info!("All auto-startup routes started");
1310        Ok(())
1311    }
1312
1313    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1314        // Sort by startup_order descending (reverse order)
1315        let route_ids: Vec<String> = {
1316            let mut pairs: Vec<_> = self
1317                .routes
1318                .iter()
1319                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1320                .collect();
1321            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1322            pairs.into_iter().map(|(id, _)| id).collect()
1323        };
1324
1325        info!("Stopping {} routes", route_ids.len());
1326
1327        for route_id in route_ids {
1328            let _ = self.stop_route(&route_id).await;
1329        }
1330
1331        info!("All routes stopped");
1332        Ok(())
1333    }
1334}
1335
1336#[cfg(test)]
1337#[path = "route_controller_tests.rs"]
1338mod tests;