Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21    BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
22    RouteController, RuntimeCommand, RuntimeHandle, Value, body::Body,
23};
24use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
25use camel_endpoint::parse_uri;
26use camel_language_api::{Expression, Language, LanguageError, Predicate};
27pub use camel_processor::aggregator::SharedLanguageRegistry;
28use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
29use camel_processor::circuit_breaker::CircuitBreakerLayer;
30use camel_processor::error_handler::ErrorHandlerLayer;
31use camel_processor::script_mutator::ScriptMutator;
32use camel_processor::{ChoiceService, WhenClause};
33
34use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
35use crate::lifecycle::adapters::route_compiler::{
36    compose_pipeline, compose_traced_pipeline_with_contracts,
37};
38use crate::lifecycle::application::route_definition::{
39    BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
40};
41use crate::shared::components::domain::Registry;
42use crate::shared::observability::domain::{DetailLevel, TracerConfig};
43use arc_swap::ArcSwap;
44use camel_bean::BeanRegistry;
45
46/// Notification sent when a route crashes.
47///
48/// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
49/// to monitor and restart failed routes.
50#[derive(Debug, Clone)]
51pub struct CrashNotification {
52    /// The ID of the crashed route.
53    pub route_id: String,
54    /// The error that caused the crash.
55    pub error: String,
56}
57
58/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
59///
60/// # Safety
61///
62/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
63/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
64///
65/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
66///    (creates a new boxed service from the inner clone).
67/// 2. The clone is owned by the calling thread and never shared.
68/// 3. ArcSwap guarantees we only get & references (no &mut).
69///
70/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
71/// clone() does not mutate shared state and each thread gets an independent copy.
72pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
73unsafe impl Sync for SyncBoxProcessor {}
74
75type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
76
77/// Internal trait extending [`RouteController`] with methods needed by [`CamelContext`]
78/// that are not part of the public lifecycle API.
79///
80/// Both [`DefaultRouteController`] and the future `SupervisingRouteController` implement
81/// this trait, allowing `CamelContext` to hold either as `Arc<Mutex<dyn RouteControllerInternal>>`.
82#[async_trait::async_trait]
83pub trait RouteControllerInternal: RouteController + Send {
84    /// Add a route definition to the controller.
85    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
86
87    /// Atomically swap the pipeline of a running route (for hot-reload).
88    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
89
90    /// Returns the `from_uri` of a route by ID.
91    fn route_from_uri(&self, route_id: &str) -> Option<String>;
92
93    /// Set a global error handler applied to all routes.
94    fn set_error_handler(&mut self, config: ErrorHandlerConfig);
95
96    /// Set the self-reference needed to create `ProducerContext`.
97    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
98
99    /// Set runtime handle for ProducerContext command/query access.
100    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>);
101
102    /// Returns the number of routes in the controller.
103    fn route_count(&self) -> usize;
104
105    /// Returns the current in-flight count for a route, or `None` if route not found.
106    fn in_flight_count(&self, route_id: &str) -> Option<u64>;
107
108    /// Returns `true` if a route with the given ID exists.
109    fn route_exists(&self, route_id: &str) -> bool;
110
111    /// Returns all route IDs.
112    fn route_ids(&self) -> Vec<String>;
113
114    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
115    fn auto_startup_route_ids(&self) -> Vec<String>;
116
117    /// Returns route IDs sorted by shutdown order (startup order descending).
118    fn shutdown_route_ids(&self) -> Vec<String>;
119
120    /// Configure tracing from a [`TracerConfig`].
121    fn set_tracer_config(&mut self, config: &TracerConfig);
122
123    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting it into the route map.
124    /// Used by hot-reload to prepare a new pipeline for atomic swap.
125    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
126
127    /// Remove a route from the controller map (route must be stopped first).
128    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
129
130    /// Start a route by ID (for use by hot-reload, where async_trait is required).
131    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
132
133    /// Stop a route by ID (for use by hot-reload, where async_trait is required).
134    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
135}
136
137/// Internal state for a managed route.
138struct AggregateSplitInfo {
139    pre_pipeline: SharedPipeline,
140    agg_config: AggregatorConfig,
141    post_pipeline: SharedPipeline,
142}
143
144struct ManagedRoute {
145    /// The route definition metadata (for introspection).
146    definition: RouteDefinitionInfo,
147    /// Source endpoint URI.
148    from_uri: String,
149    /// Resolved processor pipeline (wrapped for atomic swap).
150    pipeline: SharedPipeline,
151    /// Concurrency model override (if any).
152    concurrency: Option<ConcurrencyModel>,
153    /// Handle for the consumer task (if running).
154    consumer_handle: Option<JoinHandle<()>>,
155    /// Handle for the pipeline task (if running).
156    pipeline_handle: Option<JoinHandle<()>>,
157    /// Cancellation token for stopping the consumer task.
158    /// This allows independent control of the consumer lifecycle (for suspend/resume).
159    consumer_cancel_token: CancellationToken,
160    /// Cancellation token for stopping the pipeline task.
161    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
162    pipeline_cancel_token: CancellationToken,
163    /// Channel sender for sending exchanges to the pipeline.
164    /// Stored to allow resuming a suspended route without recreating the channel.
165    channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
166    /// In-flight exchange counter. `None` when UoW is not configured for this route.
167    in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
168    aggregate_split: Option<AggregateSplitInfo>,
169    agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
170}
171
172fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
173    handle.as_ref().is_some_and(|h| !h.is_finished())
174}
175
176fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
177    match (
178        handle_is_running(&managed.consumer_handle),
179        handle_is_running(&managed.pipeline_handle),
180    ) {
181        (true, true) => "Started",
182        (false, true) => "Suspended",
183        (true, false) => "Stopping",
184        (false, false) => "Stopped",
185    }
186}
187
188fn find_top_level_aggregate_with_timeout(
189    steps: &[BuilderStep],
190) -> Option<(usize, AggregatorConfig)> {
191    for (i, step) in steps.iter().enumerate() {
192        if let BuilderStep::Aggregate { config } = step {
193            if has_timeout_condition(&config.completion) {
194                return Some((i, config.clone()));
195            }
196            break;
197        }
198    }
199    None
200}
201
202fn is_pending(ex: &Exchange) -> bool {
203    ex.property("CamelAggregatorPending")
204        .and_then(|v| v.as_bool())
205        .unwrap_or(false)
206}
207
208/// Wait for a pipeline service to be ready with circuit breaker backoff.
209///
210/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
211/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
212/// cancellation checks. It returns `Ok(())` when the service is ready, or
213/// `Err(e)` if cancellation occurred or a fatal error was encountered.
214async fn ready_with_backoff(
215    pipeline: &mut BoxProcessor,
216    cancel: &CancellationToken,
217) -> Result<(), CamelError> {
218    loop {
219        match pipeline.ready().await {
220            Ok(_) => return Ok(()),
221            Err(CamelError::CircuitOpen(ref msg)) => {
222                warn!("Circuit open, backing off: {msg}");
223                tokio::select! {
224                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
225                        continue;
226                    }
227                    _ = cancel.cancelled() => {
228                        // Shutting down — don't retry.
229                        return Err(CamelError::CircuitOpen(msg.clone()));
230                    }
231                }
232            }
233            Err(e) => {
234                error!("Pipeline not ready: {e}");
235                return Err(e);
236            }
237        }
238    }
239}
240
241fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
242    let stamp = std::time::SystemTime::now()
243        .duration_since(std::time::UNIX_EPOCH)
244        .unwrap_or_default()
245        .as_nanos();
246    RuntimeCommand::FailRoute {
247        route_id: route_id.to_string(),
248        error: error.to_string(),
249        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
250        causation_id: None,
251    }
252}
253
254async fn publish_runtime_failure(
255    runtime: Option<Weak<dyn RuntimeHandle>>,
256    route_id: &str,
257    error: &str,
258) {
259    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
260        return;
261    };
262    let command = runtime_failure_command(route_id, error);
263    if let Err(runtime_error) = runtime.execute(command).await {
264        warn!(
265            route_id = %route_id,
266            error = %runtime_error,
267            "failed to synchronize route crash with runtime projection"
268        );
269    }
270}
271
272/// Default implementation of [`RouteController`].
273///
274/// Manages route lifecycle with support for:
275/// - Starting/stopping individual routes
276/// - Suspending and resuming routes
277/// - Auto-startup with startup ordering
278/// - Graceful shutdown
279pub struct DefaultRouteController {
280    /// Routes indexed by route ID.
281    routes: HashMap<String, ManagedRoute>,
282    /// Reference to the component registry for resolving endpoints.
283    registry: Arc<std::sync::Mutex<Registry>>,
284    /// Shared language registry for resolving declarative language expressions.
285    languages: SharedLanguageRegistry,
286    /// Bean registry for bean method invocation.
287    beans: Arc<std::sync::Mutex<BeanRegistry>>,
288    /// Self-reference for creating ProducerContext.
289    /// Set after construction via `set_self_ref()`.
290    self_ref: Option<Arc<Mutex<dyn RouteController>>>,
291    /// Runtime handle injected into ProducerContext for command/query operations.
292    runtime: Option<Weak<dyn RuntimeHandle>>,
293    /// Optional global error handler applied to all routes without a per-route handler.
294    global_error_handler: Option<ErrorHandlerConfig>,
295    /// Optional crash notifier for supervision.
296    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
297    /// Whether tracing is enabled for route pipelines.
298    tracing_enabled: bool,
299    /// Detail level for tracing when enabled.
300    tracer_detail_level: DetailLevel,
301    /// Metrics collector for tracing processor.
302    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
303}
304
305impl DefaultRouteController {
306    /// Create a new `DefaultRouteController` with the given registry.
307    pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
308        Self::with_beans(
309            registry,
310            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
311        )
312    }
313
314    /// Create a new `DefaultRouteController` with shared bean registry.
315    pub fn with_beans(
316        registry: Arc<std::sync::Mutex<Registry>>,
317        beans: Arc<std::sync::Mutex<BeanRegistry>>,
318    ) -> Self {
319        Self {
320            routes: HashMap::new(),
321            registry,
322            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
323            beans,
324            self_ref: None,
325            runtime: None,
326            global_error_handler: None,
327            crash_notifier: None,
328            tracing_enabled: false,
329            tracer_detail_level: DetailLevel::Minimal,
330            tracer_metrics: None,
331        }
332    }
333
334    /// Create a new `DefaultRouteController` with shared language registry.
335    pub fn with_languages(
336        registry: Arc<std::sync::Mutex<Registry>>,
337        languages: SharedLanguageRegistry,
338    ) -> Self {
339        Self {
340            routes: HashMap::new(),
341            registry,
342            languages,
343            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
344            self_ref: None,
345            runtime: None,
346            global_error_handler: None,
347            crash_notifier: None,
348            tracing_enabled: false,
349            tracer_detail_level: DetailLevel::Minimal,
350            tracer_metrics: None,
351        }
352    }
353
354    /// Set the self-reference for creating ProducerContext.
355    ///
356    /// This must be called after wrapping the controller in `Arc<Mutex<>>`.
357    pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
358        self.self_ref = Some(self_ref);
359    }
360
361    /// Set runtime handle for ProducerContext creation.
362    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
363        self.runtime = Some(Arc::downgrade(&runtime));
364    }
365
366    /// Get the self-reference, if set.
367    ///
368    /// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
369    /// to spawn the supervision loop.
370    pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
371        self.self_ref.clone()
372    }
373
374    /// Get runtime handle for supervision-triggered lifecycle commands, if set.
375    pub fn runtime_handle_for_supervision(&self) -> Option<Arc<dyn RuntimeHandle>> {
376        self.runtime.as_ref().and_then(Weak::upgrade)
377    }
378
379    /// Set the crash notifier for supervision.
380    ///
381    /// When set, the controller will send a [`CrashNotification`] whenever
382    /// a consumer crashes.
383    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
384        self.crash_notifier = Some(tx);
385    }
386
387    /// Set a global error handler applied to all routes without a per-route handler.
388    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
389        self.global_error_handler = Some(config);
390    }
391
392    /// Configure tracing for this route controller.
393    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
394        self.tracing_enabled = config.enabled;
395        self.tracer_detail_level = config.detail_level.clone();
396        self.tracer_metrics = config.metrics_collector.clone();
397    }
398
399    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
400        let mut producer_ctx = ProducerContext::new();
401        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
402            producer_ctx = producer_ctx.with_runtime(runtime);
403        }
404        Ok(producer_ctx)
405    }
406
407    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
408    fn resolve_error_handler(
409        &self,
410        config: ErrorHandlerConfig,
411        producer_ctx: &ProducerContext,
412        registry: &Registry,
413    ) -> Result<ErrorHandlerLayer, CamelError> {
414        // Resolve DLC URI → producer.
415        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
416            let parsed = parse_uri(uri)?;
417            let component = registry.get_or_err(&parsed.scheme)?;
418            let endpoint = component.create_endpoint(uri)?;
419            Some(endpoint.create_producer(producer_ctx)?)
420        } else {
421            None
422        };
423
424        // Resolve per-policy `handled_by` URIs.
425        let mut resolved_policies = Vec::new();
426        for policy in config.policies {
427            let handler_producer = if let Some(ref uri) = policy.handled_by {
428                let parsed = parse_uri(uri)?;
429                let component = registry.get_or_err(&parsed.scheme)?;
430                let endpoint = component.create_endpoint(uri)?;
431                Some(endpoint.create_producer(producer_ctx)?)
432            } else {
433                None
434            };
435            resolved_policies.push((policy, handler_producer));
436        }
437
438        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
439    }
440
441    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
442    /// Returns `Err` if any hook URI cannot be resolved.
443    fn resolve_uow_layer(
444        &self,
445        config: &UnitOfWorkConfig,
446        producer_ctx: &ProducerContext,
447        registry: &Registry,
448        counter: Option<Arc<AtomicU64>>,
449    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
450        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
451            let parsed = parse_uri(uri)?;
452            let component = registry.get_or_err(&parsed.scheme)?;
453            let endpoint = component.create_endpoint(uri)?;
454            endpoint.create_producer(producer_ctx).map_err(|e| {
455                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
456            })
457        };
458
459        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
460        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
461
462        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
463        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
464        Ok((layer, counter))
465    }
466
467    fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
468        let guard = self
469            .languages
470            .lock()
471            .expect("mutex poisoned: another thread panicked while holding this lock");
472        guard.get(language).cloned().ok_or_else(|| {
473            CamelError::RouteError(format!(
474                "language `{language}` is not registered in CamelContext"
475            ))
476        })
477    }
478
479    fn compile_language_expression(
480        &self,
481        expression: &LanguageExpressionDef,
482    ) -> Result<Arc<dyn Expression>, CamelError> {
483        let language = self.resolve_language(&expression.language)?;
484        let compiled = language
485            .create_expression(&expression.source)
486            .map_err(|e| {
487                CamelError::RouteError(format!(
488                    "failed to compile {} expression `{}`: {e}",
489                    expression.language, expression.source
490                ))
491            })?;
492        Ok(Arc::from(compiled))
493    }
494
495    fn compile_language_predicate(
496        &self,
497        expression: &LanguageExpressionDef,
498    ) -> Result<Arc<dyn Predicate>, CamelError> {
499        let language = self.resolve_language(&expression.language)?;
500        let compiled = language.create_predicate(&expression.source).map_err(|e| {
501            CamelError::RouteError(format!(
502                "failed to compile {} predicate `{}`: {e}",
503                expression.language, expression.source
504            ))
505        })?;
506        Ok(Arc::from(compiled))
507    }
508
509    fn compile_filter_predicate(
510        &self,
511        expression: &LanguageExpressionDef,
512    ) -> Result<FilterPredicate, CamelError> {
513        let predicate = self.compile_language_predicate(expression)?;
514        Ok(Arc::new(move |exchange: &Exchange| {
515            predicate.matches(exchange).unwrap_or(false)
516        }))
517    }
518
519    fn value_to_body(value: Value) -> Body {
520        match value {
521            Value::Null => Body::Empty,
522            Value::String(text) => Body::Text(text),
523            other => Body::Json(other),
524        }
525    }
526
527    /// Resolve BuilderSteps into BoxProcessors.
528    pub(crate) fn resolve_steps(
529        &self,
530        steps: Vec<BuilderStep>,
531        producer_ctx: &ProducerContext,
532        registry: &Arc<std::sync::Mutex<Registry>>,
533    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
534        let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
535            let parsed = parse_uri(uri)?;
536            let registry_guard = registry
537                .lock()
538                .expect("mutex poisoned: another thread panicked while holding this lock");
539            let component = registry_guard.get_or_err(&parsed.scheme)?;
540            let endpoint = component.create_endpoint(uri)?;
541            endpoint.create_producer(producer_ctx)
542        };
543
544        let mut processors: Vec<(BoxProcessor, Option<camel_api::BodyType>)> = Vec::new();
545        for step in steps {
546            match step {
547                BuilderStep::Processor(svc) => {
548                    processors.push((svc, None));
549                }
550                BuilderStep::To(uri) => {
551                    let parsed = parse_uri(&uri)?;
552                    let registry_guard = registry
553                        .lock()
554                        .expect("mutex poisoned: another thread panicked while holding this lock");
555                    let component = registry_guard.get_or_err(&parsed.scheme)?;
556                    let endpoint = component.create_endpoint(&uri)?;
557                    let contract = endpoint.body_contract();
558                    let producer = endpoint.create_producer(producer_ctx)?;
559                    processors.push((producer, contract));
560                }
561                BuilderStep::Stop => {
562                    processors.push((BoxProcessor::new(camel_processor::StopService), None));
563                }
564                BuilderStep::Log { level, message } => {
565                    let svc = camel_processor::LogProcessor::new(level, message);
566                    processors.push((BoxProcessor::new(svc), None));
567                }
568                BuilderStep::DeclarativeSetHeader { key, value } => match value {
569                    ValueSourceDef::Literal(value) => {
570                        let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
571                        processors.push((BoxProcessor::new(svc), None));
572                    }
573                    ValueSourceDef::Expression(expression) => {
574                        let expression = self.compile_language_expression(&expression)?;
575                        let svc = camel_processor::DynamicSetHeader::new(
576                            IdentityProcessor,
577                            key,
578                            move |exchange: &Exchange| {
579                                expression.evaluate(exchange).unwrap_or(Value::Null)
580                            },
581                        );
582                        processors.push((BoxProcessor::new(svc), None));
583                    }
584                },
585                BuilderStep::DeclarativeSetBody { value } => match value {
586                    ValueSourceDef::Literal(value) => {
587                        let body = Self::value_to_body(value);
588                        let svc = camel_processor::SetBody::new(
589                            IdentityProcessor,
590                            move |_exchange: &Exchange| body.clone(),
591                        );
592                        processors.push((BoxProcessor::new(svc), None));
593                    }
594                    ValueSourceDef::Expression(expression) => {
595                        let expression = self.compile_language_expression(&expression)?;
596                        let svc = camel_processor::SetBody::new(
597                            IdentityProcessor,
598                            move |exchange: &Exchange| {
599                                let value = expression.evaluate(exchange).unwrap_or(Value::Null);
600                                Self::value_to_body(value)
601                            },
602                        );
603                        processors.push((BoxProcessor::new(svc), None));
604                    }
605                },
606                BuilderStep::DeclarativeFilter { predicate, steps } => {
607                    let predicate = self.compile_filter_predicate(&predicate)?;
608                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
609                    let sub_processors: Vec<BoxProcessor> =
610                        sub_pairs.into_iter().map(|(p, _)| p).collect();
611                    let sub_pipeline = compose_pipeline(sub_processors);
612                    let svc =
613                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
614                    processors.push((BoxProcessor::new(svc), None));
615                }
616                BuilderStep::DeclarativeChoice { whens, otherwise } => {
617                    let mut when_clauses = Vec::new();
618                    for when_step in whens {
619                        let predicate = self.compile_filter_predicate(&when_step.predicate)?;
620                        let sub_pairs =
621                            self.resolve_steps(when_step.steps, producer_ctx, registry)?;
622                        let sub_processors: Vec<BoxProcessor> =
623                            sub_pairs.into_iter().map(|(p, _)| p).collect();
624                        let pipeline = compose_pipeline(sub_processors);
625                        when_clauses.push(WhenClause {
626                            predicate,
627                            pipeline,
628                        });
629                    }
630                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
631                        let sub_pairs =
632                            self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
633                        let sub_processors: Vec<BoxProcessor> =
634                            sub_pairs.into_iter().map(|(p, _)| p).collect();
635                        Some(compose_pipeline(sub_processors))
636                    } else {
637                        None
638                    };
639                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
640                    processors.push((BoxProcessor::new(svc), None));
641                }
642                BuilderStep::DeclarativeScript { expression } => {
643                    let lang = self.resolve_language(&expression.language)?;
644                    match lang.create_mutating_expression(&expression.source) {
645                        Ok(mut_expr) => {
646                            processors
647                                .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
648                        }
649                        Err(LanguageError::NotSupported { .. }) => {
650                            // Graceful degradation: YAML declarative routes fall back to read-only
651                            // Expression → SetBody when the language doesn't support MutatingExpression.
652                            // This preserves backwards compatibility for languages like Simple that
653                            // only implement Expression. Contrast with the explicit .script() DSL step
654                            // which hard-errors on NotSupported (user opted in to mutation semantics).
655                            // TODO: add integration test asserting Simple language falls back to
656                            // read-only path (requires full CamelContext test harness).
657                            let expression = self.compile_language_expression(&expression)?;
658                            let svc = camel_processor::SetBody::new(
659                                IdentityProcessor,
660                                move |exchange: &Exchange| {
661                                    let value =
662                                        expression.evaluate(exchange).unwrap_or(Value::Null);
663                                    Self::value_to_body(value)
664                                },
665                            );
666                            processors.push((BoxProcessor::new(svc), None));
667                        }
668                        Err(e) => {
669                            return Err(CamelError::RouteError(format!(
670                                "Failed to create mutating expression for language '{}': {}",
671                                expression.language, e
672                            )));
673                        }
674                    }
675                }
676                BuilderStep::Split { config, steps } => {
677                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
678                    let sub_processors: Vec<BoxProcessor> =
679                        sub_pairs.into_iter().map(|(p, _)| p).collect();
680                    let sub_pipeline = compose_pipeline(sub_processors);
681                    let splitter =
682                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
683                    processors.push((BoxProcessor::new(splitter), None));
684                }
685                BuilderStep::DeclarativeSplit {
686                    expression,
687                    aggregation,
688                    parallel,
689                    parallel_limit,
690                    stop_on_exception,
691                    steps,
692                } => {
693                    let lang_expr = self.compile_language_expression(&expression)?;
694                    let split_fn = move |exchange: &Exchange| {
695                        let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
696                        match value {
697                            Value::String(s) => s
698                                .lines()
699                                .filter(|line| !line.is_empty())
700                                .map(|line| {
701                                    let mut fragment = exchange.clone();
702                                    fragment.input.body = Body::from(line.to_string());
703                                    fragment
704                                })
705                                .collect(),
706                            Value::Array(arr) => arr
707                                .into_iter()
708                                .map(|v| {
709                                    let mut fragment = exchange.clone();
710                                    fragment.input.body = Body::from(v);
711                                    fragment
712                                })
713                                .collect(),
714                            _ => vec![exchange.clone()],
715                        }
716                    };
717
718                    let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
719                        .aggregation(aggregation)
720                        .parallel(parallel)
721                        .stop_on_exception(stop_on_exception);
722                    if let Some(limit) = parallel_limit {
723                        config = config.parallel_limit(limit);
724                    }
725
726                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
727                    let sub_processors: Vec<BoxProcessor> =
728                        sub_pairs.into_iter().map(|(p, _)| p).collect();
729                    let sub_pipeline = compose_pipeline(sub_processors);
730                    let splitter =
731                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
732                    processors.push((BoxProcessor::new(splitter), None));
733                }
734                BuilderStep::Aggregate { config } => {
735                    let (late_tx, _late_rx) = mpsc::channel(256);
736                    let registry: SharedLanguageRegistry =
737                        Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
738                    let cancel = CancellationToken::new();
739                    let svc =
740                        camel_processor::AggregatorService::new(config, late_tx, registry, cancel);
741                    processors.push((BoxProcessor::new(svc), None));
742                }
743                BuilderStep::Filter { predicate, steps } => {
744                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
745                    let sub_processors: Vec<BoxProcessor> =
746                        sub_pairs.into_iter().map(|(p, _)| p).collect();
747                    let sub_pipeline = compose_pipeline(sub_processors);
748                    let svc =
749                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
750                    processors.push((BoxProcessor::new(svc), None));
751                }
752                BuilderStep::Choice { whens, otherwise } => {
753                    // Resolve each when clause's sub-steps into a pipeline.
754                    let mut when_clauses = Vec::new();
755                    for when_step in whens {
756                        let sub_pairs =
757                            self.resolve_steps(when_step.steps, producer_ctx, registry)?;
758                        let sub_processors: Vec<BoxProcessor> =
759                            sub_pairs.into_iter().map(|(p, _)| p).collect();
760                        let pipeline = compose_pipeline(sub_processors);
761                        when_clauses.push(WhenClause {
762                            predicate: when_step.predicate,
763                            pipeline,
764                        });
765                    }
766                    // Resolve otherwise branch (if present).
767                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
768                        let sub_pairs =
769                            self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
770                        let sub_processors: Vec<BoxProcessor> =
771                            sub_pairs.into_iter().map(|(p, _)| p).collect();
772                        Some(compose_pipeline(sub_processors))
773                    } else {
774                        None
775                    };
776                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
777                    processors.push((BoxProcessor::new(svc), None));
778                }
779                BuilderStep::WireTap { uri } => {
780                    let producer = resolve_producer(&uri)?;
781                    let svc = camel_processor::WireTapService::new(producer);
782                    processors.push((BoxProcessor::new(svc), None));
783                }
784                BuilderStep::Multicast { config, steps } => {
785                    // Each top-level step in the multicast scope becomes an independent endpoint.
786                    let mut endpoints = Vec::new();
787                    for step in steps {
788                        let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
789                        let sub_processors: Vec<BoxProcessor> =
790                            sub_pairs.into_iter().map(|(p, _)| p).collect();
791                        let endpoint = compose_pipeline(sub_processors);
792                        endpoints.push(endpoint);
793                    }
794                    let svc = camel_processor::MulticastService::new(endpoints, config);
795                    processors.push((BoxProcessor::new(svc), None));
796                }
797                BuilderStep::DeclarativeLog { level, message } => {
798                    let ValueSourceDef::Expression(expression) = message else {
799                        // Literal case is already converted to a Processor in compile.rs;
800                        // this arm should never be reached for literals.
801                        unreachable!(
802                            "DeclarativeLog with Literal should have been compiled to a Processor"
803                        );
804                    };
805                    let expression = self.compile_language_expression(&expression)?;
806                    let svc =
807                        camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
808                            expression
809                                .evaluate(exchange)
810                                .unwrap_or_else(|e| {
811                                    warn!(error = %e, "log expression evaluation failed");
812                                    Value::Null
813                                })
814                                .to_string()
815                        });
816                    processors.push((BoxProcessor::new(svc), None));
817                }
818                BuilderStep::Bean { name, method } => {
819                    // Lock beans registry to lookup bean
820                    let beans = self.beans.lock().expect(
821                        "beans mutex poisoned: another thread panicked while holding this lock",
822                    );
823
824                    // Lookup bean by name
825                    let bean = beans.get(&name).ok_or_else(|| {
826                        CamelError::ProcessorError(format!("Bean not found: {}", name))
827                    })?;
828
829                    // Clone Arc for async closure (release lock before async)
830                    let bean_clone = Arc::clone(&bean);
831                    let method = method.clone();
832
833                    // Create processor that invokes bean method
834                    let processor = tower::service_fn(move |mut exchange: Exchange| {
835                        let bean = Arc::clone(&bean_clone);
836                        let method = method.clone();
837
838                        async move {
839                            bean.call(&method, &mut exchange).await?;
840                            Ok(exchange)
841                        }
842                    });
843
844                    processors.push((BoxProcessor::new(processor), None));
845                }
846                BuilderStep::Script { language, script } => {
847                    let lang = self.resolve_language(&language)?;
848                    match lang.create_mutating_expression(&script) {
849                        Ok(mut_expr) => {
850                            processors
851                                .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
852                        }
853                        Err(LanguageError::NotSupported {
854                            feature,
855                            language: ref lang_name,
856                        }) => {
857                            // Hard error: the .script() DSL step explicitly requests mutation semantics.
858                            // If the language doesn't support MutatingExpression, the route is mis-configured.
859                            return Err(CamelError::RouteError(format!(
860                                "Language '{}' does not support {} (required for .script() step)",
861                                lang_name, feature
862                            )));
863                        }
864                        Err(e) => {
865                            return Err(CamelError::RouteError(format!(
866                                "Failed to create mutating expression for language '{}': {}",
867                                language, e
868                            )));
869                        }
870                    }
871                }
872                BuilderStep::Throttle { config, steps } => {
873                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
874                    let sub_processors: Vec<BoxProcessor> =
875                        sub_pairs.into_iter().map(|(p, _)| p).collect();
876                    let sub_pipeline = compose_pipeline(sub_processors);
877                    let svc =
878                        camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
879                    processors.push((BoxProcessor::new(svc), None));
880                }
881                BuilderStep::LoadBalance { config, steps } => {
882                    // Each top-level step in the load_balance scope becomes an independent endpoint.
883                    let mut endpoints = Vec::new();
884                    for step in steps {
885                        let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
886                        let sub_processors: Vec<BoxProcessor> =
887                            sub_pairs.into_iter().map(|(p, _)| p).collect();
888                        let endpoint = compose_pipeline(sub_processors);
889                        endpoints.push(endpoint);
890                    }
891                    let svc =
892                        camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
893                    processors.push((BoxProcessor::new(svc), None));
894                }
895                BuilderStep::DynamicRouter { config } => {
896                    use camel_api::EndpointResolver;
897
898                    let producer_ctx_clone = producer_ctx.clone();
899                    let registry_clone = Arc::clone(registry);
900                    let resolver: EndpointResolver = Arc::new(move |uri: &str| {
901                        let parsed = match parse_uri(uri) {
902                            Ok(p) => p,
903                            Err(_) => return None,
904                        };
905                        let registry_guard = match registry_clone.lock() {
906                            Ok(g) => g,
907                            Err(_) => return None, // mutex poisoned
908                        };
909                        let component = match registry_guard.get_or_err(&parsed.scheme) {
910                            Ok(c) => c,
911                            Err(_) => return None,
912                        };
913                        let endpoint = match component.create_endpoint(uri) {
914                            Ok(e) => e,
915                            Err(_) => return None,
916                        };
917                        let producer = match endpoint.create_producer(&producer_ctx_clone) {
918                            Ok(p) => p,
919                            Err(_) => return None,
920                        };
921                        Some(BoxProcessor::new(producer))
922                    });
923                    let svc = camel_processor::dynamic_router::DynamicRouterService::new(
924                        config, resolver,
925                    );
926                    processors.push((BoxProcessor::new(svc), None));
927                }
928                BuilderStep::RoutingSlip { config } => {
929                    use camel_api::EndpointResolver;
930
931                    let producer_ctx_clone = producer_ctx.clone();
932                    let registry_clone = registry.clone();
933                    let resolver: EndpointResolver = Arc::new(move |uri: &str| {
934                        let parsed = match parse_uri(uri) {
935                            Ok(p) => p,
936                            Err(_) => return None,
937                        };
938                        let registry_guard = match registry_clone.lock() {
939                            Ok(g) => g,
940                            Err(_) => return None,
941                        };
942                        let component = match registry_guard.get_or_err(&parsed.scheme) {
943                            Ok(c) => c,
944                            Err(_) => return None,
945                        };
946                        let endpoint = match component.create_endpoint(uri) {
947                            Ok(e) => e,
948                            Err(_) => return None,
949                        };
950                        let producer = match endpoint.create_producer(&producer_ctx_clone) {
951                            Ok(p) => p,
952                            Err(_) => return None,
953                        };
954                        Some(BoxProcessor::new(producer))
955                    });
956
957                    let svc =
958                        camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
959                    processors.push((BoxProcessor::new(svc), None));
960                }
961            }
962        }
963        Ok(processors)
964    }
965
966    /// Add a route definition to the controller.
967    ///
968    /// Steps are resolved immediately using the registry.
969    ///
970    /// # Errors
971    ///
972    /// Returns an error if:
973    /// - A route with the same ID already exists
974    /// - Step resolution fails
975    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
976        let route_id = definition.route_id().to_string();
977
978        if self.routes.contains_key(&route_id) {
979            return Err(CamelError::RouteError(format!(
980                "Route '{}' already exists",
981                route_id
982            )));
983        }
984
985        info!(route_id = %route_id, "Adding route to controller");
986
987        // Extract definition info for storage before steps are consumed
988        let definition_info = definition.to_info();
989        let RouteDefinition {
990            from_uri,
991            steps,
992            error_handler,
993            circuit_breaker,
994            unit_of_work,
995            concurrency,
996            ..
997        } = definition;
998
999        // Create ProducerContext from self_ref for step resolution
1000        let producer_ctx = self.build_producer_context()?;
1001
1002        // Take ownership of steps before resolve_steps consumes them
1003        let mut aggregate_split: Option<AggregateSplitInfo> = None;
1004        let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
1005            Some((idx, agg_config)) => {
1006                let mut pre_steps = steps;
1007                let mut rest = pre_steps.split_off(idx);
1008                let _agg_step = rest.remove(0);
1009                let post_steps = rest;
1010
1011                let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
1012                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
1013                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
1014                    compose_pipeline(pre_procs),
1015                )));
1016
1017                let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
1018                let post_procs: Vec<BoxProcessor> =
1019                    post_pairs.into_iter().map(|(p, _)| p).collect();
1020                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
1021                    compose_pipeline(post_procs),
1022                )));
1023
1024                aggregate_split = Some(AggregateSplitInfo {
1025                    pre_pipeline,
1026                    agg_config,
1027                    post_pipeline,
1028                });
1029
1030                vec![]
1031            }
1032            None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
1033        };
1034        let route_id_for_tracing = route_id.clone();
1035        let mut pipeline = if processors_with_contracts.is_empty() {
1036            BoxProcessor::new(IdentityProcessor)
1037        } else {
1038            compose_traced_pipeline_with_contracts(
1039                processors_with_contracts,
1040                &route_id_for_tracing,
1041                self.tracing_enabled,
1042                self.tracer_detail_level.clone(),
1043                self.tracer_metrics.clone(),
1044            )
1045        };
1046
1047        // Apply circuit breaker if configured
1048        if let Some(cb_config) = circuit_breaker {
1049            let cb_layer = CircuitBreakerLayer::new(cb_config);
1050            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1051        }
1052
1053        // Determine which error handler config to use (per-route takes precedence)
1054        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
1055
1056        if let Some(config) = eh_config {
1057            // Lock registry for error handler resolution
1058            let registry = self
1059                .registry
1060                .lock()
1061                .expect("mutex poisoned: another thread panicked while holding this lock");
1062            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
1063            pipeline = BoxProcessor::new(layer.layer(pipeline));
1064        }
1065
1066        // Apply UoW layer outermost (after error handler)
1067        let uow_counter = if let Some(uow_config) = &unit_of_work {
1068            let registry = self
1069                .registry
1070                .lock()
1071                .expect("mutex poisoned: registry lock in add_route uow");
1072            let (uow_layer, counter) =
1073                self.resolve_uow_layer(uow_config, &producer_ctx, &registry, None)?;
1074            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1075            Some(counter)
1076        } else {
1077            None
1078        };
1079
1080        self.routes.insert(
1081            route_id.clone(),
1082            ManagedRoute {
1083                definition: definition_info,
1084                from_uri,
1085                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
1086                concurrency,
1087                consumer_handle: None,
1088                pipeline_handle: None,
1089                consumer_cancel_token: CancellationToken::new(),
1090                pipeline_cancel_token: CancellationToken::new(),
1091                channel_sender: None,
1092                in_flight: uow_counter,
1093                aggregate_split,
1094                agg_service: None,
1095            },
1096        );
1097
1098        Ok(())
1099    }
1100
1101    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
1102    ///
1103    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
1104    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
1105    pub fn compile_route_definition(
1106        &self,
1107        def: RouteDefinition,
1108    ) -> Result<BoxProcessor, CamelError> {
1109        let route_id = def.route_id().to_string();
1110
1111        let producer_ctx = self.build_producer_context()?;
1112
1113        let processors_with_contracts =
1114            self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
1115        let mut pipeline = compose_traced_pipeline_with_contracts(
1116            processors_with_contracts,
1117            &route_id,
1118            self.tracing_enabled,
1119            self.tracer_detail_level.clone(),
1120            self.tracer_metrics.clone(),
1121        );
1122
1123        if let Some(cb_config) = def.circuit_breaker {
1124            let cb_layer = CircuitBreakerLayer::new(cb_config);
1125            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1126        }
1127
1128        let eh_config = def
1129            .error_handler
1130            .clone()
1131            .or_else(|| self.global_error_handler.clone());
1132        if let Some(config) = eh_config {
1133            // Lock registry for error handler resolution
1134            let registry = self
1135                .registry
1136                .lock()
1137                .expect("mutex poisoned: registry lock in compile_route_definition");
1138            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
1139            pipeline = BoxProcessor::new(layer.layer(pipeline));
1140        }
1141
1142        // Apply UoW layer outermost
1143        if let Some(uow_config) = &def.unit_of_work {
1144            let existing_counter = self
1145                .routes
1146                .get(&route_id)
1147                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1148
1149            let registry = self
1150                .registry
1151                .lock()
1152                .expect("mutex poisoned: registry lock in compile_route_definition uow");
1153
1154            let (uow_layer, _counter) =
1155                self.resolve_uow_layer(uow_config, &producer_ctx, &registry, existing_counter)?;
1156
1157            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1158        }
1159
1160        Ok(pipeline)
1161    }
1162
1163    /// Remove a route from the controller map.
1164    ///
1165    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
1166    /// Returns an error if the route is still running or does not exist.
1167    /// Does not cancel any running tasks — call `stop_route` first.
1168    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1169        let managed = self.routes.get(route_id).ok_or_else(|| {
1170            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1171        })?;
1172        if handle_is_running(&managed.consumer_handle)
1173            || handle_is_running(&managed.pipeline_handle)
1174        {
1175            return Err(CamelError::RouteError(format!(
1176                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1177                route_id,
1178                inferred_lifecycle_label(managed)
1179            )));
1180        }
1181        self.routes.remove(route_id);
1182        info!(route_id = %route_id, "Route removed from controller");
1183        Ok(())
1184    }
1185
1186    /// Returns the number of routes in the controller.
1187    pub fn route_count(&self) -> usize {
1188        self.routes.len()
1189    }
1190
1191    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1192        self.routes.get(route_id).map(|r| {
1193            r.in_flight
1194                .as_ref()
1195                .map_or(0, |c| c.load(Ordering::Relaxed))
1196        })
1197    }
1198
1199    /// Returns `true` if a route with the given ID exists.
1200    pub fn route_exists(&self, route_id: &str) -> bool {
1201        self.routes.contains_key(route_id)
1202    }
1203
1204    /// Returns all route IDs.
1205    pub fn route_ids(&self) -> Vec<String> {
1206        self.routes.keys().cloned().collect()
1207    }
1208
1209    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
1210    pub fn auto_startup_route_ids(&self) -> Vec<String> {
1211        let mut pairs: Vec<(String, i32)> = self
1212            .routes
1213            .iter()
1214            .filter(|(_, managed)| managed.definition.auto_startup())
1215            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1216            .collect();
1217        pairs.sort_by_key(|(_, order)| *order);
1218        pairs.into_iter().map(|(id, _)| id).collect()
1219    }
1220
1221    /// Returns route IDs sorted by shutdown order (startup order descending).
1222    pub fn shutdown_route_ids(&self) -> Vec<String> {
1223        let mut pairs: Vec<(String, i32)> = self
1224            .routes
1225            .iter()
1226            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1227            .collect();
1228        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1229        pairs.into_iter().map(|(id, _)| id).collect()
1230    }
1231
1232    /// Atomically swap the pipeline of a route.
1233    ///
1234    /// In-flight requests finish with the old pipeline (kept alive by Arc).
1235    /// New requests immediately use the new pipeline.
1236    pub fn swap_pipeline(
1237        &self,
1238        route_id: &str,
1239        new_pipeline: BoxProcessor,
1240    ) -> Result<(), CamelError> {
1241        let managed = self
1242            .routes
1243            .get(route_id)
1244            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1245
1246        if managed.aggregate_split.is_some() {
1247            tracing::warn!(
1248                route_id = %route_id,
1249                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1250            );
1251        }
1252
1253        managed
1254            .pipeline
1255            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1256        info!(route_id = %route_id, "Pipeline swapped atomically");
1257        Ok(())
1258    }
1259
1260    /// Returns the from_uri of a route, if it exists.
1261    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1262        self.routes.get(route_id).map(|r| r.from_uri.clone())
1263    }
1264
1265    /// Get a clone of the current pipeline for a route.
1266    ///
1267    /// This is useful for testing and introspection.
1268    /// Returns `None` if the route doesn't exist.
1269    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1270        self.routes
1271            .get(route_id)
1272            .map(|r| r.pipeline.load().0.clone())
1273    }
1274
1275    /// Internal stop implementation that can set custom status.
1276    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1277        let managed = self
1278            .routes
1279            .get_mut(route_id)
1280            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1281
1282        if !handle_is_running(&managed.consumer_handle)
1283            && !handle_is_running(&managed.pipeline_handle)
1284        {
1285            return Ok(());
1286        }
1287
1288        info!(route_id = %route_id, "Stopping route");
1289
1290        // Cancel both tokens to signal shutdown for consumer and pipeline independently
1291        let managed = self
1292            .routes
1293            .get_mut(route_id)
1294            .expect("invariant: route must exist after prior existence check");
1295        managed.consumer_cancel_token.cancel();
1296
1297        // Aggregator v2: force-complete pending buckets before cancelling pipeline
1298        let managed = self
1299            .routes
1300            .get_mut(route_id)
1301            .expect("invariant: route must exist after prior existence check");
1302        if let Some(agg_svc) = &managed.agg_service {
1303            let guard = agg_svc.lock().unwrap();
1304            guard.force_complete_all();
1305        }
1306
1307        let managed = self
1308            .routes
1309            .get_mut(route_id)
1310            .expect("invariant: route must exist after prior existence check");
1311        managed.pipeline_cancel_token.cancel();
1312
1313        // Take handles directly (no Arc<Mutex> wrapper needed)
1314        let managed = self
1315            .routes
1316            .get_mut(route_id)
1317            .expect("invariant: route must exist after prior existence check");
1318        let consumer_handle = managed.consumer_handle.take();
1319        let pipeline_handle = managed.pipeline_handle.take();
1320
1321        // IMPORTANT: Drop channel_sender early so rx.recv() returns None
1322        // This ensures the pipeline task can exit even if idle on recv()
1323        let managed = self
1324            .routes
1325            .get_mut(route_id)
1326            .expect("invariant: route must exist after prior existence check");
1327        managed.channel_sender = None;
1328
1329        // Wait for tasks to complete with timeout
1330        // The CancellationToken already signaled tasks to stop gracefully.
1331        // Combined with the select! in pipeline loops, this should exit quickly.
1332        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1333            match (consumer_handle, pipeline_handle) {
1334                (Some(c), Some(p)) => {
1335                    let _ = tokio::join!(c, p);
1336                }
1337                (Some(c), None) => {
1338                    let _ = c.await;
1339                }
1340                (None, Some(p)) => {
1341                    let _ = p.await;
1342                }
1343                (None, None) => {}
1344            }
1345        })
1346        .await;
1347
1348        if timeout_result.is_err() {
1349            warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
1350        }
1351
1352        // Get the managed route again (can't hold across await)
1353        let managed = self
1354            .routes
1355            .get_mut(route_id)
1356            .expect("invariant: route must exist after prior existence check");
1357
1358        // Create fresh cancellation tokens for next start
1359        managed.consumer_cancel_token = CancellationToken::new();
1360        managed.pipeline_cancel_token = CancellationToken::new();
1361
1362        info!(route_id = %route_id, "Route stopped");
1363        Ok(())
1364    }
1365}
1366
1367#[async_trait::async_trait]
1368impl RouteController for DefaultRouteController {
1369    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1370        // Check if route exists and can be started.
1371        {
1372            let managed = self
1373                .routes
1374                .get_mut(route_id)
1375                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1376
1377            let consumer_running = handle_is_running(&managed.consumer_handle);
1378            let pipeline_running = handle_is_running(&managed.pipeline_handle);
1379            if consumer_running && pipeline_running {
1380                return Ok(());
1381            }
1382            if !consumer_running && pipeline_running {
1383                return Err(CamelError::RouteError(format!(
1384                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1385                    route_id
1386                )));
1387            }
1388            if consumer_running && !pipeline_running {
1389                return Err(CamelError::RouteError(format!(
1390                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1391                    route_id
1392                )));
1393            }
1394        }
1395
1396        info!(route_id = %route_id, "Starting route");
1397
1398        // Get the resolved route info
1399        let (from_uri, pipeline, concurrency) = {
1400            let managed = self
1401                .routes
1402                .get(route_id)
1403                .expect("invariant: route must exist after prior existence check");
1404            (
1405                managed.from_uri.clone(),
1406                Arc::clone(&managed.pipeline),
1407                managed.concurrency.clone(),
1408            )
1409        };
1410
1411        // Clone crash notifier for consumer task
1412        let crash_notifier = self.crash_notifier.clone();
1413        let runtime_for_consumer = self.runtime.clone();
1414
1415        // Parse from URI and create consumer (lock registry for lookup)
1416        let parsed = parse_uri(&from_uri)?;
1417        let registry = self
1418            .registry
1419            .lock()
1420            .expect("mutex poisoned: another thread panicked while holding this lock");
1421        let component = registry.get_or_err(&parsed.scheme)?;
1422        let endpoint = component.create_endpoint(&from_uri)?;
1423        let mut consumer = endpoint.create_consumer()?;
1424        let consumer_concurrency = consumer.concurrency_model();
1425        // Drop the lock before spawning tasks
1426        drop(registry);
1427
1428        // Resolve effective concurrency: route override > consumer default
1429        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1430
1431        // Get the managed route for mutation
1432        let managed = self
1433            .routes
1434            .get_mut(route_id)
1435            .expect("invariant: route must exist after prior existence check");
1436
1437        // Create channel for consumer to send exchanges
1438        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1439        // Create child tokens for independent lifecycle control
1440        let consumer_cancel = managed.consumer_cancel_token.child_token();
1441        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1442        // Clone sender for storage (to reuse on resume)
1443        let tx_for_storage = tx.clone();
1444        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1445
1446        // --- Aggregator v2: check for aggregate route with timeout ---
1447        let managed = self
1448            .routes
1449            .get_mut(route_id)
1450            .expect("invariant: route must exist after prior existence check");
1451
1452        if let Some(split) = managed.aggregate_split.as_ref() {
1453            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1454
1455            let route_cancel_clone = pipeline_cancel.clone();
1456            let svc = AggregatorService::new(
1457                split.agg_config.clone(),
1458                late_tx,
1459                Arc::clone(&self.languages),
1460                route_cancel_clone,
1461            );
1462            let agg = Arc::new(std::sync::Mutex::new(svc));
1463
1464            managed.agg_service = Some(Arc::clone(&agg));
1465
1466            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1467            let pre_pipeline = Arc::clone(&split.pre_pipeline);
1468            let post_pipeline = Arc::clone(&split.post_pipeline);
1469
1470            // Spawn consumer task (same as normal route)
1471            let route_id_for_consumer = route_id.to_string();
1472            let consumer_handle = tokio::spawn(async move {
1473                if let Err(e) = consumer.start(consumer_ctx).await {
1474                    error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1475                    let error_msg = e.to_string();
1476                    if let Some(tx) = crash_notifier {
1477                        let _ = tx
1478                            .send(CrashNotification {
1479                                route_id: route_id_for_consumer.clone(),
1480                                error: error_msg.clone(),
1481                            })
1482                            .await;
1483                    }
1484                    publish_runtime_failure(
1485                        runtime_for_consumer,
1486                        &route_id_for_consumer,
1487                        &error_msg,
1488                    )
1489                    .await;
1490                }
1491            });
1492
1493            // Spawn biased select forward loop
1494            let pipeline_handle = tokio::spawn(async move {
1495                loop {
1496                    tokio::select! {
1497                        biased;
1498
1499                        late_ex = async {
1500                            let mut rx = late_rx.lock().await;
1501                            rx.recv().await
1502                        } => {
1503                            match late_ex {
1504                                Some(ex) => {
1505                                    let pipe = post_pipeline.load();
1506                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
1507                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
1508                                    }
1509                                }
1510                                None => return,
1511                            }
1512                        }
1513
1514                        envelope_opt = rx.recv() => {
1515                            match envelope_opt {
1516                                Some(envelope) => {
1517                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
1518                                    let pre_pipe = pre_pipeline.load();
1519                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1520                                        Ok(ex) => ex,
1521                                        Err(e) => {
1522                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1523                                            continue;
1524                                        }
1525                                    };
1526
1527                                    let ex = {
1528                                        let cloned_svc = agg.lock().unwrap().clone();
1529                                        cloned_svc.oneshot(ex).await
1530                                    };
1531
1532                                    match ex {
1533                                        Ok(ex) => {
1534                                            if !is_pending(&ex) {
1535                                                let post_pipe = post_pipeline.load();
1536                                                let out = post_pipe.0.clone().oneshot(ex).await;
1537                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
1538                                            } else if let Some(tx) = reply_tx {
1539                                                let _ = tx.send(Ok(ex));
1540                                            }
1541                                        }
1542                                        Err(e) => {
1543                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1544                                        }
1545                                    }
1546                                }
1547                                None => return,
1548                            }
1549                        }
1550
1551                        _ = pipeline_cancel.cancelled() => {
1552                            {
1553                                let guard = agg.lock().unwrap();
1554                                guard.force_complete_all();
1555                            }
1556                            let mut rx_guard = late_rx.lock().await;
1557                            while let Ok(late_ex) = rx_guard.try_recv() {
1558                                let pipe = post_pipeline.load();
1559                                let _ = pipe.0.clone().oneshot(late_ex).await;
1560                            }
1561                            break;
1562                        }
1563                    }
1564                }
1565            });
1566
1567            let managed = self
1568                .routes
1569                .get_mut(route_id)
1570                .expect("invariant: route must exist");
1571            managed.consumer_handle = Some(consumer_handle);
1572            managed.pipeline_handle = Some(pipeline_handle);
1573            managed.channel_sender = Some(tx_for_storage);
1574
1575            info!(route_id = %route_id, "Route started (aggregate with timeout)");
1576            return Ok(());
1577        }
1578        // --- End aggregator v2 branch ---
1579
1580        // Start consumer in background task.
1581        let route_id_for_consumer = route_id.to_string();
1582        let consumer_handle = tokio::spawn(async move {
1583            if let Err(e) = consumer.start(consumer_ctx).await {
1584                error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1585                let error_msg = e.to_string();
1586
1587                // Send crash notification if notifier is configured
1588                if let Some(tx) = crash_notifier {
1589                    let _ = tx
1590                        .send(CrashNotification {
1591                            route_id: route_id_for_consumer.clone(),
1592                            error: error_msg.clone(),
1593                        })
1594                        .await;
1595                }
1596
1597                publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1598                    .await;
1599            }
1600        });
1601
1602        // Spawn pipeline task with its own cancellation token
1603        let pipeline_handle = match effective_concurrency {
1604            ConcurrencyModel::Sequential => {
1605                tokio::spawn(async move {
1606                    loop {
1607                        // Use select! to exit promptly on cancellation even when idle
1608                        let envelope = tokio::select! {
1609                            envelope = rx.recv() => match envelope {
1610                                Some(e) => e,
1611                                None => return, // Channel closed
1612                            },
1613                            _ = pipeline_cancel.cancelled() => {
1614                                // Cancellation requested - exit gracefully
1615                                return;
1616                            }
1617                        };
1618                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1619
1620                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1621                        let mut pipeline = pipeline.load().0.clone();
1622
1623                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1624                            if let Some(tx) = reply_tx {
1625                                let _ = tx.send(Err(e));
1626                            }
1627                            return;
1628                        }
1629
1630                        let result = pipeline.call(exchange).await;
1631                        if let Some(tx) = reply_tx {
1632                            let _ = tx.send(result);
1633                        } else if let Err(ref e) = result
1634                            && !matches!(e, CamelError::Stopped)
1635                        {
1636                            error!("Pipeline error: {e}");
1637                        }
1638                    }
1639                })
1640            }
1641            ConcurrencyModel::Concurrent { max } => {
1642                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1643                tokio::spawn(async move {
1644                    loop {
1645                        // Use select! to exit promptly on cancellation even when idle
1646                        let envelope = tokio::select! {
1647                            envelope = rx.recv() => match envelope {
1648                                Some(e) => e,
1649                                None => return, // Channel closed
1650                            },
1651                            _ = pipeline_cancel.cancelled() => {
1652                                // Cancellation requested - exit gracefully
1653                                return;
1654                            }
1655                        };
1656                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1657                        let pipe_ref = Arc::clone(&pipeline);
1658                        let sem = sem.clone();
1659                        let cancel = pipeline_cancel.clone();
1660                        tokio::spawn(async move {
1661                            // Acquire semaphore permit if bounded
1662                            let _permit = match &sem {
1663                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1664                                None => None,
1665                            };
1666
1667                            // Load current pipeline from ArcSwap
1668                            let mut pipe = pipe_ref.load().0.clone();
1669
1670                            // Wait for service ready with circuit breaker backoff
1671                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1672                                if let Some(tx) = reply_tx {
1673                                    let _ = tx.send(Err(e));
1674                                }
1675                                return;
1676                            }
1677
1678                            let result = pipe.call(exchange).await;
1679                            if let Some(tx) = reply_tx {
1680                                let _ = tx.send(result);
1681                            } else if let Err(ref e) = result
1682                                && !matches!(e, CamelError::Stopped)
1683                            {
1684                                error!("Pipeline error: {e}");
1685                            }
1686                        });
1687                    }
1688                })
1689            }
1690        };
1691
1692        // Store handles and update status
1693        let managed = self
1694            .routes
1695            .get_mut(route_id)
1696            .expect("invariant: route must exist after prior existence check");
1697        managed.consumer_handle = Some(consumer_handle);
1698        managed.pipeline_handle = Some(pipeline_handle);
1699        managed.channel_sender = Some(tx_for_storage);
1700
1701        info!(route_id = %route_id, "Route started");
1702        Ok(())
1703    }
1704
1705    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1706        self.stop_route_internal(route_id).await
1707    }
1708
1709    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1710        self.stop_route(route_id).await?;
1711        tokio::time::sleep(Duration::from_millis(100)).await;
1712        self.start_route(route_id).await
1713    }
1714
1715    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1716        // Check route exists and state.
1717        let managed = self
1718            .routes
1719            .get_mut(route_id)
1720            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1721
1722        let consumer_running = handle_is_running(&managed.consumer_handle);
1723        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1724
1725        // Can only suspend from active started state.
1726        if !consumer_running || !pipeline_running {
1727            return Err(CamelError::RouteError(format!(
1728                "Cannot suspend route '{}' with execution lifecycle {}",
1729                route_id,
1730                inferred_lifecycle_label(managed)
1731            )));
1732        }
1733
1734        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1735
1736        // Cancel consumer token only (keep pipeline running)
1737        let managed = self
1738            .routes
1739            .get_mut(route_id)
1740            .expect("invariant: route must exist after prior existence check");
1741        managed.consumer_cancel_token.cancel();
1742
1743        // Take and join consumer handle
1744        let managed = self
1745            .routes
1746            .get_mut(route_id)
1747            .expect("invariant: route must exist after prior existence check");
1748        let consumer_handle = managed.consumer_handle.take();
1749
1750        // Wait for consumer task to complete with timeout
1751        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1752            if let Some(handle) = consumer_handle {
1753                let _ = handle.await;
1754            }
1755        })
1756        .await;
1757
1758        if timeout_result.is_err() {
1759            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1760        }
1761
1762        // Get the managed route again (can't hold across await)
1763        let managed = self
1764            .routes
1765            .get_mut(route_id)
1766            .expect("invariant: route must exist after prior existence check");
1767
1768        // Create fresh cancellation token for consumer (for resume)
1769        managed.consumer_cancel_token = CancellationToken::new();
1770
1771        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1772        Ok(())
1773    }
1774
1775    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1776        // Check route exists and is Suspended-equivalent execution state.
1777        let managed = self
1778            .routes
1779            .get(route_id)
1780            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1781
1782        let consumer_running = handle_is_running(&managed.consumer_handle);
1783        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1784        if consumer_running || !pipeline_running {
1785            return Err(CamelError::RouteError(format!(
1786                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1787                route_id,
1788                inferred_lifecycle_label(managed)
1789            )));
1790        }
1791
1792        // Get the stored channel sender (must exist for a suspended route)
1793        let sender = managed.channel_sender.clone().ok_or_else(|| {
1794            CamelError::RouteError("Suspended route has no channel sender".into())
1795        })?;
1796
1797        // Get from_uri and concurrency for creating new consumer
1798        let from_uri = managed.from_uri.clone();
1799
1800        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1801
1802        // Parse from URI and create consumer (lock registry for lookup)
1803        let parsed = parse_uri(&from_uri)?;
1804        let registry = self
1805            .registry
1806            .lock()
1807            .expect("mutex poisoned: another thread panicked while holding this lock");
1808        let component = registry.get_or_err(&parsed.scheme)?;
1809        let endpoint = component.create_endpoint(&from_uri)?;
1810        let mut consumer = endpoint.create_consumer()?;
1811        // Drop the lock before spawning tasks
1812        drop(registry);
1813
1814        // Get the managed route for mutation
1815        let managed = self
1816            .routes
1817            .get_mut(route_id)
1818            .expect("invariant: route must exist after prior existence check");
1819
1820        // Create child token for consumer lifecycle
1821        let consumer_cancel = managed.consumer_cancel_token.child_token();
1822
1823        let crash_notifier = self.crash_notifier.clone();
1824        let runtime_for_consumer = self.runtime.clone();
1825
1826        // Create ConsumerContext with the stored sender
1827        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1828
1829        // Spawn consumer task
1830        let route_id_for_consumer = route_id.to_string();
1831        let consumer_handle = tokio::spawn(async move {
1832            if let Err(e) = consumer.start(consumer_ctx).await {
1833                error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1834                let error_msg = e.to_string();
1835
1836                // Send crash notification if notifier is configured
1837                if let Some(tx) = crash_notifier {
1838                    let _ = tx
1839                        .send(CrashNotification {
1840                            route_id: route_id_for_consumer.clone(),
1841                            error: error_msg.clone(),
1842                        })
1843                        .await;
1844                }
1845
1846                publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1847                    .await;
1848            }
1849        });
1850
1851        // Store consumer handle and update status
1852        let managed = self
1853            .routes
1854            .get_mut(route_id)
1855            .expect("invariant: route must exist after prior existence check");
1856        managed.consumer_handle = Some(consumer_handle);
1857
1858        info!(route_id = %route_id, "Route resumed");
1859        Ok(())
1860    }
1861
1862    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1863        // Only start routes where auto_startup() == true
1864        // Sort by startup_order() ascending before starting
1865        let route_ids: Vec<String> = {
1866            let mut pairs: Vec<_> = self
1867                .routes
1868                .iter()
1869                .filter(|(_, r)| r.definition.auto_startup())
1870                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1871                .collect();
1872            pairs.sort_by_key(|(_, order)| *order);
1873            pairs.into_iter().map(|(id, _)| id).collect()
1874        };
1875
1876        info!("Starting {} auto-startup routes", route_ids.len());
1877
1878        // Collect errors but continue starting remaining routes
1879        let mut errors: Vec<String> = Vec::new();
1880        for route_id in route_ids {
1881            if let Err(e) = self.start_route(&route_id).await {
1882                errors.push(format!("Route '{}': {}", route_id, e));
1883            }
1884        }
1885
1886        if !errors.is_empty() {
1887            return Err(CamelError::RouteError(format!(
1888                "Failed to start routes: {}",
1889                errors.join(", ")
1890            )));
1891        }
1892
1893        info!("All auto-startup routes started");
1894        Ok(())
1895    }
1896
1897    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1898        // Sort by startup_order descending (reverse order)
1899        let route_ids: Vec<String> = {
1900            let mut pairs: Vec<_> = self
1901                .routes
1902                .iter()
1903                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1904                .collect();
1905            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1906            pairs.into_iter().map(|(id, _)| id).collect()
1907        };
1908
1909        info!("Stopping {} routes", route_ids.len());
1910
1911        for route_id in route_ids {
1912            let _ = self.stop_route(&route_id).await;
1913        }
1914
1915        info!("All routes stopped");
1916        Ok(())
1917    }
1918}
1919
1920#[async_trait::async_trait]
1921impl RouteControllerInternal for DefaultRouteController {
1922    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
1923        DefaultRouteController::add_route(self, def)
1924    }
1925
1926    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
1927        DefaultRouteController::swap_pipeline(self, route_id, pipeline)
1928    }
1929
1930    fn route_from_uri(&self, route_id: &str) -> Option<String> {
1931        // Call the inherent method which now returns Option<String>
1932        DefaultRouteController::route_from_uri(self, route_id)
1933    }
1934
1935    fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
1936        DefaultRouteController::set_error_handler(self, config)
1937    }
1938
1939    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
1940        DefaultRouteController::set_self_ref(self, self_ref)
1941    }
1942
1943    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
1944        DefaultRouteController::set_runtime_handle(self, runtime)
1945    }
1946
1947    fn route_count(&self) -> usize {
1948        DefaultRouteController::route_count(self)
1949    }
1950
1951    fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1952        DefaultRouteController::in_flight_count(self, route_id)
1953    }
1954
1955    fn route_exists(&self, route_id: &str) -> bool {
1956        DefaultRouteController::route_exists(self, route_id)
1957    }
1958
1959    fn route_ids(&self) -> Vec<String> {
1960        DefaultRouteController::route_ids(self)
1961    }
1962
1963    fn auto_startup_route_ids(&self) -> Vec<String> {
1964        DefaultRouteController::auto_startup_route_ids(self)
1965    }
1966
1967    fn shutdown_route_ids(&self) -> Vec<String> {
1968        DefaultRouteController::shutdown_route_ids(self)
1969    }
1970
1971    fn set_tracer_config(&mut self, config: &TracerConfig) {
1972        DefaultRouteController::set_tracer_config(self, config)
1973    }
1974
1975    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
1976        DefaultRouteController::compile_route_definition(self, def)
1977    }
1978
1979    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1980        DefaultRouteController::remove_route(self, route_id)
1981    }
1982
1983    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1984        DefaultRouteController::start_route(self, route_id).await
1985    }
1986
1987    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1988        DefaultRouteController::stop_route(self, route_id).await
1989    }
1990}
1991
1992#[cfg(test)]
1993mod tests {
1994    use super::*;
1995    use crate::shared::components::domain::Registry;
1996
1997    fn build_controller() -> DefaultRouteController {
1998        DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())))
1999    }
2000
2001    fn build_controller_with_components() -> DefaultRouteController {
2002        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2003        {
2004            let mut guard = registry.lock().expect("registry lock");
2005            guard.register(camel_component_timer::TimerComponent::new());
2006            guard.register(camel_component_mock::MockComponent::new());
2007            guard.register(camel_component_log::LogComponent::new());
2008        }
2009        DefaultRouteController::new(registry)
2010    }
2011
2012    fn set_self_ref(controller: &mut DefaultRouteController) {
2013        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2014        let other: Arc<Mutex<dyn RouteController>> =
2015            Arc::new(Mutex::new(DefaultRouteController::new(registry)));
2016        controller.set_self_ref(other);
2017    }
2018
2019    fn register_simple_language(controller: &mut DefaultRouteController) {
2020        controller.languages.lock().expect("languages lock").insert(
2021            "simple".into(),
2022            Arc::new(camel_language_simple::SimpleLanguage),
2023        );
2024    }
2025
2026    #[test]
2027    fn test_route_controller_internal_is_object_safe() {
2028        let _: Option<Box<dyn RouteControllerInternal>> = None;
2029    }
2030
2031    #[test]
2032    fn helper_functions_cover_non_async_branches() {
2033        let managed = ManagedRoute {
2034            definition: RouteDefinition::new("timer:a", vec![])
2035                .with_route_id("r")
2036                .to_info(),
2037            from_uri: "timer:a".into(),
2038            pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(BoxProcessor::new(
2039                IdentityProcessor,
2040            )))),
2041            concurrency: None,
2042            consumer_handle: None,
2043            pipeline_handle: None,
2044            consumer_cancel_token: CancellationToken::new(),
2045            pipeline_cancel_token: CancellationToken::new(),
2046            channel_sender: None,
2047            in_flight: None,
2048            aggregate_split: None,
2049            agg_service: None,
2050        };
2051
2052        assert_eq!(inferred_lifecycle_label(&managed), "Stopped");
2053        assert!(!handle_is_running(&managed.consumer_handle));
2054
2055        let cmd = runtime_failure_command("route-x", "boom");
2056        match cmd {
2057            RuntimeCommand::FailRoute {
2058                route_id, error, ..
2059            } => {
2060                assert_eq!(route_id, "route-x");
2061                assert_eq!(error, "boom");
2062            }
2063            _ => panic!("expected FailRoute command"),
2064        }
2065    }
2066
2067    #[test]
2068    fn add_route_detects_duplicates() {
2069        let mut controller = build_controller();
2070        set_self_ref(&mut controller);
2071
2072        controller
2073            .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
2074            .expect("add route");
2075
2076        let dup_err = controller
2077            .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
2078            .expect_err("duplicate must fail");
2079        assert!(dup_err.to_string().contains("already exists"));
2080    }
2081
2082    #[test]
2083    fn route_introspection_and_ordering_helpers_work() {
2084        let mut controller = build_controller();
2085        set_self_ref(&mut controller);
2086
2087        controller
2088            .add_route(
2089                RouteDefinition::new("timer:a", vec![])
2090                    .with_route_id("a")
2091                    .with_startup_order(20),
2092            )
2093            .unwrap();
2094        controller
2095            .add_route(
2096                RouteDefinition::new("timer:b", vec![])
2097                    .with_route_id("b")
2098                    .with_startup_order(10),
2099            )
2100            .unwrap();
2101        controller
2102            .add_route(
2103                RouteDefinition::new("timer:c", vec![])
2104                    .with_route_id("c")
2105                    .with_auto_startup(false)
2106                    .with_startup_order(5),
2107            )
2108            .unwrap();
2109
2110        assert_eq!(controller.route_count(), 3);
2111        assert_eq!(controller.route_from_uri("a"), Some("timer:a".into()));
2112        assert!(controller.route_ids().contains(&"a".to_string()));
2113        assert_eq!(
2114            controller.auto_startup_route_ids(),
2115            vec!["b".to_string(), "a".to_string()]
2116        );
2117        assert_eq!(
2118            controller.shutdown_route_ids(),
2119            vec!["a".to_string(), "b".to_string(), "c".to_string()]
2120        );
2121    }
2122
2123    #[test]
2124    fn swap_pipeline_and_remove_route_behaviors() {
2125        let mut controller = build_controller();
2126        set_self_ref(&mut controller);
2127
2128        controller
2129            .add_route(RouteDefinition::new("timer:a", vec![]).with_route_id("swap"))
2130            .unwrap();
2131
2132        controller
2133            .swap_pipeline("swap", BoxProcessor::new(IdentityProcessor))
2134            .unwrap();
2135        assert!(controller.get_pipeline("swap").is_some());
2136
2137        controller.remove_route("swap").unwrap();
2138        assert_eq!(controller.route_count(), 0);
2139
2140        let err = controller
2141            .remove_route("swap")
2142            .expect_err("missing route must fail");
2143        assert!(err.to_string().contains("not found"));
2144    }
2145
2146    #[test]
2147    fn resolve_steps_covers_declarative_and_eip_variants() {
2148        use camel_api::LanguageExpressionDef;
2149        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2150
2151        let mut controller = build_controller_with_components();
2152        set_self_ref(&mut controller);
2153        register_simple_language(&mut controller);
2154
2155        let expr = |source: &str| LanguageExpressionDef {
2156            language: "simple".into(),
2157            source: source.into(),
2158        };
2159
2160        let steps = vec![
2161            BuilderStep::To("mock:out".into()),
2162            BuilderStep::Stop,
2163            BuilderStep::Log {
2164                level: camel_processor::LogLevel::Info,
2165                message: "log".into(),
2166            },
2167            BuilderStep::DeclarativeSetHeader {
2168                key: "k".into(),
2169                value: ValueSourceDef::Literal(Value::String("v".into())),
2170            },
2171            BuilderStep::DeclarativeSetHeader {
2172                key: "k2".into(),
2173                value: ValueSourceDef::Expression(expr("${body}")),
2174            },
2175            BuilderStep::DeclarativeSetBody {
2176                value: ValueSourceDef::Expression(expr("${body}")),
2177            },
2178            BuilderStep::DeclarativeFilter {
2179                predicate: expr("${body} != null"),
2180                steps: vec![BuilderStep::Stop],
2181            },
2182            BuilderStep::DeclarativeChoice {
2183                whens: vec![
2184                    crate::lifecycle::application::route_definition::DeclarativeWhenStep {
2185                        predicate: expr("${body} == 'x'"),
2186                        steps: vec![BuilderStep::Stop],
2187                    },
2188                ],
2189                otherwise: Some(vec![BuilderStep::Stop]),
2190            },
2191            BuilderStep::DeclarativeScript {
2192                expression: expr("${body}"),
2193            },
2194            BuilderStep::Split {
2195                config: SplitterConfig::new(split_body_lines())
2196                    .aggregation(AggregationStrategy::CollectAll),
2197                steps: vec![BuilderStep::Stop],
2198            },
2199            BuilderStep::DeclarativeSplit {
2200                expression: expr("${body}"),
2201                aggregation: AggregationStrategy::Original,
2202                parallel: false,
2203                parallel_limit: Some(2),
2204                stop_on_exception: true,
2205                steps: vec![BuilderStep::Stop],
2206            },
2207            BuilderStep::Aggregate {
2208                config: camel_api::AggregatorConfig::correlate_by("id")
2209                    .complete_when_size(1)
2210                    .build(),
2211            },
2212            BuilderStep::Filter {
2213                predicate: Arc::new(|_| true),
2214                steps: vec![BuilderStep::Stop],
2215            },
2216            BuilderStep::Choice {
2217                whens: vec![crate::lifecycle::application::route_definition::WhenStep {
2218                    predicate: Arc::new(|_| true),
2219                    steps: vec![BuilderStep::Stop],
2220                }],
2221                otherwise: Some(vec![BuilderStep::Stop]),
2222            },
2223            BuilderStep::WireTap {
2224                uri: "mock:tap".into(),
2225            },
2226            BuilderStep::Multicast {
2227                steps: vec![
2228                    BuilderStep::To("mock:m1".into()),
2229                    BuilderStep::To("mock:m2".into()),
2230                ],
2231                config: camel_api::MulticastConfig::new(),
2232            },
2233            BuilderStep::DeclarativeLog {
2234                level: camel_processor::LogLevel::Info,
2235                message: ValueSourceDef::Expression(expr("${body}")),
2236            },
2237            BuilderStep::Throttle {
2238                config: camel_api::ThrottlerConfig::new(10, Duration::from_millis(100)),
2239                steps: vec![BuilderStep::To("mock:t".into())],
2240            },
2241            BuilderStep::LoadBalance {
2242                config: camel_api::LoadBalancerConfig::round_robin(),
2243                steps: vec![
2244                    BuilderStep::To("mock:l1".into()),
2245                    BuilderStep::To("mock:l2".into()),
2246                ],
2247            },
2248            BuilderStep::DynamicRouter {
2249                config: camel_api::DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
2250            },
2251            BuilderStep::RoutingSlip {
2252                config: camel_api::RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
2253            },
2254        ];
2255
2256        let producer_ctx = ProducerContext::new();
2257        let resolved = controller
2258            .resolve_steps(steps, &producer_ctx, &controller.registry)
2259            .expect("resolve should succeed");
2260        assert!(!resolved.is_empty());
2261    }
2262
2263    #[test]
2264    fn resolve_steps_script_requires_mutating_language_support() {
2265        use camel_api::LanguageExpressionDef;
2266
2267        let mut controller = build_controller_with_components();
2268        set_self_ref(&mut controller);
2269        register_simple_language(&mut controller);
2270
2271        let steps = vec![BuilderStep::Script {
2272            language: "simple".into(),
2273            script: "${body}".into(),
2274        }];
2275
2276        let err = controller
2277            .resolve_steps(steps, &ProducerContext::new(), &controller.registry)
2278            .expect_err("simple script should fail for mutating expression");
2279        assert!(err.to_string().contains("does not support"));
2280
2281        let bean_missing = vec![BuilderStep::Bean {
2282            name: "unknown".into(),
2283            method: "run".into(),
2284        }];
2285        let bean_err = controller
2286            .resolve_steps(bean_missing, &ProducerContext::new(), &controller.registry)
2287            .expect_err("missing bean must fail");
2288        assert!(bean_err.to_string().contains("Bean not found"));
2289
2290        let bad_declarative = vec![BuilderStep::DeclarativeScript {
2291            expression: LanguageExpressionDef {
2292                language: "unknown".into(),
2293                source: "x".into(),
2294            },
2295        }];
2296        let lang_err = controller
2297            .resolve_steps(
2298                bad_declarative,
2299                &ProducerContext::new(),
2300                &controller.registry,
2301            )
2302            .expect_err("unknown language must fail");
2303        assert!(lang_err.to_string().contains("not registered"));
2304    }
2305
2306    #[tokio::test]
2307    async fn lifecycle_methods_report_missing_routes() {
2308        let mut controller = build_controller();
2309
2310        assert!(controller.start_route("missing").await.is_err());
2311        assert!(controller.stop_route("missing").await.is_err());
2312        assert!(controller.suspend_route("missing").await.is_err());
2313        assert!(controller.resume_route("missing").await.is_err());
2314    }
2315
2316    #[tokio::test]
2317    async fn start_stop_route_happy_path_with_timer_and_mock() {
2318        let mut controller = build_controller_with_components();
2319        set_self_ref(&mut controller);
2320
2321        let route = RouteDefinition::new(
2322            "timer:tick?period=10&repeatCount=1",
2323            vec![BuilderStep::To("mock:out".into())],
2324        )
2325        .with_route_id("rt-1");
2326        controller.add_route(route).unwrap();
2327
2328        controller.start_route("rt-1").await.unwrap();
2329        tokio::time::sleep(Duration::from_millis(40)).await;
2330        controller.stop_route("rt-1").await.unwrap();
2331
2332        controller.remove_route("rt-1").unwrap();
2333    }
2334
2335    #[tokio::test]
2336    async fn suspend_resume_and_restart_cover_execution_transitions() {
2337        let mut controller = build_controller_with_components();
2338        set_self_ref(&mut controller);
2339
2340        let route = RouteDefinition::new(
2341            "timer:tick?period=30",
2342            vec![BuilderStep::To("mock:out".into())],
2343        )
2344        .with_route_id("rt-2");
2345        controller.add_route(route).unwrap();
2346
2347        controller.start_route("rt-2").await.unwrap();
2348        controller.suspend_route("rt-2").await.unwrap();
2349        controller.resume_route("rt-2").await.unwrap();
2350        controller.restart_route("rt-2").await.unwrap();
2351        controller.stop_route("rt-2").await.unwrap();
2352    }
2353
2354    #[tokio::test]
2355    async fn remove_route_rejects_running_route() {
2356        let mut controller = build_controller_with_components();
2357        set_self_ref(&mut controller);
2358
2359        let route = RouteDefinition::new(
2360            "timer:tick?period=25",
2361            vec![BuilderStep::To("mock:out".into())],
2362        )
2363        .with_route_id("rt-running");
2364        controller.add_route(route).unwrap();
2365        controller.start_route("rt-running").await.unwrap();
2366
2367        let err = controller
2368            .remove_route("rt-running")
2369            .expect_err("running route removal must fail");
2370        assert!(err.to_string().contains("must be stopped before removal"));
2371
2372        controller.stop_route("rt-running").await.unwrap();
2373        controller.remove_route("rt-running").unwrap();
2374    }
2375
2376    #[tokio::test]
2377    async fn start_route_on_suspended_state_returns_guidance_error() {
2378        let mut controller = build_controller_with_components();
2379        set_self_ref(&mut controller);
2380
2381        let route = RouteDefinition::new(
2382            "timer:tick?period=40",
2383            vec![BuilderStep::To("mock:out".into())],
2384        )
2385        .with_route_id("rt-suspend");
2386        controller.add_route(route).unwrap();
2387
2388        controller.start_route("rt-suspend").await.unwrap();
2389        controller.suspend_route("rt-suspend").await.unwrap();
2390
2391        let err = controller
2392            .start_route("rt-suspend")
2393            .await
2394            .expect_err("start from suspended must fail");
2395        assert!(err.to_string().contains("use resume_route"));
2396
2397        controller.resume_route("rt-suspend").await.unwrap();
2398        controller.stop_route("rt-suspend").await.unwrap();
2399    }
2400
2401    #[tokio::test]
2402    async fn suspend_and_resume_validate_execution_state() {
2403        let mut controller = build_controller_with_components();
2404        set_self_ref(&mut controller);
2405
2406        controller
2407            .add_route(
2408                RouteDefinition::new("timer:tick?period=50", vec![]).with_route_id("rt-state"),
2409            )
2410            .unwrap();
2411
2412        let suspend_err = controller
2413            .suspend_route("rt-state")
2414            .await
2415            .expect_err("suspend before start must fail");
2416        assert!(suspend_err.to_string().contains("Cannot suspend route"));
2417
2418        controller.start_route("rt-state").await.unwrap();
2419        let resume_err = controller
2420            .resume_route("rt-state")
2421            .await
2422            .expect_err("resume while started must fail");
2423        assert!(resume_err.to_string().contains("Cannot resume route"));
2424
2425        controller.stop_route("rt-state").await.unwrap();
2426    }
2427
2428    #[tokio::test]
2429    async fn concurrent_concurrency_override_path_executes() {
2430        let mut controller = build_controller_with_components();
2431        set_self_ref(&mut controller);
2432
2433        let route = RouteDefinition::new(
2434            "timer:tick?period=10&repeatCount=2",
2435            vec![BuilderStep::To("mock:out".into())],
2436        )
2437        .with_route_id("rt-concurrent")
2438        .with_concurrency(ConcurrencyModel::Concurrent { max: Some(2) });
2439
2440        controller.add_route(route).unwrap();
2441        controller.start_route("rt-concurrent").await.unwrap();
2442        tokio::time::sleep(Duration::from_millis(50)).await;
2443        controller.stop_route("rt-concurrent").await.unwrap();
2444    }
2445
2446    #[tokio::test]
2447    async fn add_route_with_circuit_breaker_and_error_handler_compiles() {
2448        use camel_api::circuit_breaker::CircuitBreakerConfig;
2449        use camel_api::error_handler::ErrorHandlerConfig;
2450
2451        let mut controller = build_controller_with_components();
2452        set_self_ref(&mut controller);
2453
2454        let route = RouteDefinition::new("timer:tick?period=25", vec![BuilderStep::Stop])
2455            .with_route_id("rt-eh")
2456            .with_circuit_breaker(CircuitBreakerConfig::new())
2457            .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"));
2458
2459        controller
2460            .add_route(route)
2461            .expect("route with layers should compile");
2462        controller.start_route("rt-eh").await.unwrap();
2463        controller.stop_route("rt-eh").await.unwrap();
2464    }
2465
2466    #[tokio::test]
2467    async fn compile_and_swap_errors_for_missing_route() {
2468        let mut controller = build_controller_with_components();
2469        set_self_ref(&mut controller);
2470
2471        let compiled = controller
2472            .compile_route_definition(
2473                RouteDefinition::new("timer:tick?period=10", vec![BuilderStep::Stop])
2474                    .with_route_id("compiled"),
2475            )
2476            .expect("compile should work");
2477
2478        let err = controller
2479            .swap_pipeline("nope", compiled)
2480            .expect_err("missing route swap must fail");
2481        assert!(err.to_string().contains("not found"));
2482    }
2483}