Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::error_handler::ErrorHandlerConfig;
18use camel_api::metrics::MetricsCollector;
19use camel_api::{
20    BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
21    RouteController, RuntimeCommand, RuntimeHandle, Value, body::Body,
22};
23use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
24use camel_endpoint::parse_uri;
25use camel_language_api::{Expression, Language, LanguageError, Predicate};
26use camel_processor::circuit_breaker::CircuitBreakerLayer;
27use camel_processor::error_handler::ErrorHandlerLayer;
28use camel_processor::script_mutator::ScriptMutator;
29use camel_processor::{ChoiceService, WhenClause};
30
31use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
32use crate::lifecycle::adapters::route_compiler::{
33    compose_pipeline, compose_traced_pipeline_with_contracts,
34};
35use crate::lifecycle::application::route_definition::{
36    BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
37};
38use crate::shared::components::domain::Registry;
39use crate::shared::observability::domain::{DetailLevel, TracerConfig};
40use arc_swap::ArcSwap;
41use camel_bean::BeanRegistry;
42
43/// Notification sent when a route crashes.
44///
45/// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
46/// to monitor and restart failed routes.
47#[derive(Debug, Clone)]
48pub struct CrashNotification {
49    /// The ID of the crashed route.
50    pub route_id: String,
51    /// The error that caused the crash.
52    pub error: String,
53}
54
55/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
56///
57/// # Safety
58///
59/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
60/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
61///
62/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
63///    (creates a new boxed service from the inner clone).
64/// 2. The clone is owned by the calling thread and never shared.
65/// 3. ArcSwap guarantees we only get & references (no &mut).
66///
67/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
68/// clone() does not mutate shared state and each thread gets an independent copy.
69pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
70unsafe impl Sync for SyncBoxProcessor {}
71
72type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
73pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
74
75/// Internal trait extending [`RouteController`] with methods needed by [`CamelContext`]
76/// that are not part of the public lifecycle API.
77///
78/// Both [`DefaultRouteController`] and the future `SupervisingRouteController` implement
79/// this trait, allowing `CamelContext` to hold either as `Arc<Mutex<dyn RouteControllerInternal>>`.
80#[async_trait::async_trait]
81pub trait RouteControllerInternal: RouteController + Send {
82    /// Add a route definition to the controller.
83    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
84
85    /// Atomically swap the pipeline of a running route (for hot-reload).
86    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
87
88    /// Returns the `from_uri` of a route by ID.
89    fn route_from_uri(&self, route_id: &str) -> Option<String>;
90
91    /// Set a global error handler applied to all routes.
92    fn set_error_handler(&mut self, config: ErrorHandlerConfig);
93
94    /// Set the self-reference needed to create `ProducerContext`.
95    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
96
97    /// Set runtime handle for ProducerContext command/query access.
98    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>);
99
100    /// Returns the number of routes in the controller.
101    fn route_count(&self) -> usize;
102
103    /// Returns the current in-flight count for a route, or `None` if route not found.
104    fn in_flight_count(&self, route_id: &str) -> Option<u64>;
105
106    /// Returns `true` if a route with the given ID exists.
107    fn route_exists(&self, route_id: &str) -> bool;
108
109    /// Returns all route IDs.
110    fn route_ids(&self) -> Vec<String>;
111
112    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
113    fn auto_startup_route_ids(&self) -> Vec<String>;
114
115    /// Returns route IDs sorted by shutdown order (startup order descending).
116    fn shutdown_route_ids(&self) -> Vec<String>;
117
118    /// Configure tracing from a [`TracerConfig`].
119    fn set_tracer_config(&mut self, config: &TracerConfig);
120
121    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting it into the route map.
122    /// Used by hot-reload to prepare a new pipeline for atomic swap.
123    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
124
125    /// Remove a route from the controller map (route must be stopped first).
126    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
127
128    /// Start a route by ID (for use by hot-reload, where async_trait is required).
129    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
130
131    /// Stop a route by ID (for use by hot-reload, where async_trait is required).
132    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
133}
134
135/// Internal state for a managed route.
136struct ManagedRoute {
137    /// The route definition metadata (for introspection).
138    definition: RouteDefinitionInfo,
139    /// Source endpoint URI.
140    from_uri: String,
141    /// Resolved processor pipeline (wrapped for atomic swap).
142    pipeline: SharedPipeline,
143    /// Concurrency model override (if any).
144    concurrency: Option<ConcurrencyModel>,
145    /// Handle for the consumer task (if running).
146    consumer_handle: Option<JoinHandle<()>>,
147    /// Handle for the pipeline task (if running).
148    pipeline_handle: Option<JoinHandle<()>>,
149    /// Cancellation token for stopping the consumer task.
150    /// This allows independent control of the consumer lifecycle (for suspend/resume).
151    consumer_cancel_token: CancellationToken,
152    /// Cancellation token for stopping the pipeline task.
153    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
154    pipeline_cancel_token: CancellationToken,
155    /// Channel sender for sending exchanges to the pipeline.
156    /// Stored to allow resuming a suspended route without recreating the channel.
157    channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
158    /// In-flight exchange counter. `None` when UoW is not configured for this route.
159    in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
160}
161
162fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
163    handle.as_ref().is_some_and(|h| !h.is_finished())
164}
165
166fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
167    match (
168        handle_is_running(&managed.consumer_handle),
169        handle_is_running(&managed.pipeline_handle),
170    ) {
171        (true, true) => "Started",
172        (false, true) => "Suspended",
173        (true, false) => "Stopping",
174        (false, false) => "Stopped",
175    }
176}
177
178/// Wait for a pipeline service to be ready with circuit breaker backoff.
179///
180/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
181/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
182/// cancellation checks. It returns `Ok(())` when the service is ready, or
183/// `Err(e)` if cancellation occurred or a fatal error was encountered.
184async fn ready_with_backoff(
185    pipeline: &mut BoxProcessor,
186    cancel: &CancellationToken,
187) -> Result<(), CamelError> {
188    loop {
189        match pipeline.ready().await {
190            Ok(_) => return Ok(()),
191            Err(CamelError::CircuitOpen(ref msg)) => {
192                warn!("Circuit open, backing off: {msg}");
193                tokio::select! {
194                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
195                        continue;
196                    }
197                    _ = cancel.cancelled() => {
198                        // Shutting down — don't retry.
199                        return Err(CamelError::CircuitOpen(msg.clone()));
200                    }
201                }
202            }
203            Err(e) => {
204                error!("Pipeline not ready: {e}");
205                return Err(e);
206            }
207        }
208    }
209}
210
211fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
212    let stamp = std::time::SystemTime::now()
213        .duration_since(std::time::UNIX_EPOCH)
214        .unwrap_or_default()
215        .as_nanos();
216    RuntimeCommand::FailRoute {
217        route_id: route_id.to_string(),
218        error: error.to_string(),
219        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
220        causation_id: None,
221    }
222}
223
224async fn publish_runtime_failure(
225    runtime: Option<Weak<dyn RuntimeHandle>>,
226    route_id: &str,
227    error: &str,
228) {
229    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
230        return;
231    };
232    let command = runtime_failure_command(route_id, error);
233    if let Err(runtime_error) = runtime.execute(command).await {
234        warn!(
235            route_id = %route_id,
236            error = %runtime_error,
237            "failed to synchronize route crash with runtime projection"
238        );
239    }
240}
241
242/// Default implementation of [`RouteController`].
243///
244/// Manages route lifecycle with support for:
245/// - Starting/stopping individual routes
246/// - Suspending and resuming routes
247/// - Auto-startup with startup ordering
248/// - Graceful shutdown
249pub struct DefaultRouteController {
250    /// Routes indexed by route ID.
251    routes: HashMap<String, ManagedRoute>,
252    /// Reference to the component registry for resolving endpoints.
253    registry: Arc<std::sync::Mutex<Registry>>,
254    /// Shared language registry for resolving declarative language expressions.
255    languages: SharedLanguageRegistry,
256    /// Bean registry for bean method invocation.
257    beans: Arc<std::sync::Mutex<BeanRegistry>>,
258    /// Self-reference for creating ProducerContext.
259    /// Set after construction via `set_self_ref()`.
260    self_ref: Option<Arc<Mutex<dyn RouteController>>>,
261    /// Runtime handle injected into ProducerContext for command/query operations.
262    runtime: Option<Weak<dyn RuntimeHandle>>,
263    /// Optional global error handler applied to all routes without a per-route handler.
264    global_error_handler: Option<ErrorHandlerConfig>,
265    /// Optional crash notifier for supervision.
266    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
267    /// Whether tracing is enabled for route pipelines.
268    tracing_enabled: bool,
269    /// Detail level for tracing when enabled.
270    tracer_detail_level: DetailLevel,
271    /// Metrics collector for tracing processor.
272    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
273}
274
275impl DefaultRouteController {
276    /// Create a new `DefaultRouteController` with the given registry.
277    pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
278        Self::with_beans(
279            registry,
280            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
281        )
282    }
283
284    /// Create a new `DefaultRouteController` with shared bean registry.
285    pub fn with_beans(
286        registry: Arc<std::sync::Mutex<Registry>>,
287        beans: Arc<std::sync::Mutex<BeanRegistry>>,
288    ) -> Self {
289        Self {
290            routes: HashMap::new(),
291            registry,
292            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
293            beans,
294            self_ref: None,
295            runtime: None,
296            global_error_handler: None,
297            crash_notifier: None,
298            tracing_enabled: false,
299            tracer_detail_level: DetailLevel::Minimal,
300            tracer_metrics: None,
301        }
302    }
303
304    /// Create a new `DefaultRouteController` with shared language registry.
305    pub fn with_languages(
306        registry: Arc<std::sync::Mutex<Registry>>,
307        languages: SharedLanguageRegistry,
308    ) -> Self {
309        Self {
310            routes: HashMap::new(),
311            registry,
312            languages,
313            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
314            self_ref: None,
315            runtime: None,
316            global_error_handler: None,
317            crash_notifier: None,
318            tracing_enabled: false,
319            tracer_detail_level: DetailLevel::Minimal,
320            tracer_metrics: None,
321        }
322    }
323
324    /// Set the self-reference for creating ProducerContext.
325    ///
326    /// This must be called after wrapping the controller in `Arc<Mutex<>>`.
327    pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
328        self.self_ref = Some(self_ref);
329    }
330
331    /// Set runtime handle for ProducerContext creation.
332    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
333        self.runtime = Some(Arc::downgrade(&runtime));
334    }
335
336    /// Get the self-reference, if set.
337    ///
338    /// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
339    /// to spawn the supervision loop.
340    pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
341        self.self_ref.clone()
342    }
343
344    /// Get runtime handle for supervision-triggered lifecycle commands, if set.
345    pub fn runtime_handle_for_supervision(&self) -> Option<Arc<dyn RuntimeHandle>> {
346        self.runtime.as_ref().and_then(Weak::upgrade)
347    }
348
349    /// Set the crash notifier for supervision.
350    ///
351    /// When set, the controller will send a [`CrashNotification`] whenever
352    /// a consumer crashes.
353    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
354        self.crash_notifier = Some(tx);
355    }
356
357    /// Set a global error handler applied to all routes without a per-route handler.
358    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
359        self.global_error_handler = Some(config);
360    }
361
362    /// Configure tracing for this route controller.
363    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
364        self.tracing_enabled = config.enabled;
365        self.tracer_detail_level = config.detail_level.clone();
366        self.tracer_metrics = config.metrics_collector.clone();
367    }
368
369    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
370        let mut producer_ctx = ProducerContext::new();
371        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
372            producer_ctx = producer_ctx.with_runtime(runtime);
373        }
374        Ok(producer_ctx)
375    }
376
377    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
378    fn resolve_error_handler(
379        &self,
380        config: ErrorHandlerConfig,
381        producer_ctx: &ProducerContext,
382        registry: &Registry,
383    ) -> Result<ErrorHandlerLayer, CamelError> {
384        // Resolve DLC URI → producer.
385        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
386            let parsed = parse_uri(uri)?;
387            let component = registry.get_or_err(&parsed.scheme)?;
388            let endpoint = component.create_endpoint(uri)?;
389            Some(endpoint.create_producer(producer_ctx)?)
390        } else {
391            None
392        };
393
394        // Resolve per-policy `handled_by` URIs.
395        let mut resolved_policies = Vec::new();
396        for policy in config.policies {
397            let handler_producer = if let Some(ref uri) = policy.handled_by {
398                let parsed = parse_uri(uri)?;
399                let component = registry.get_or_err(&parsed.scheme)?;
400                let endpoint = component.create_endpoint(uri)?;
401                Some(endpoint.create_producer(producer_ctx)?)
402            } else {
403                None
404            };
405            resolved_policies.push((policy, handler_producer));
406        }
407
408        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
409    }
410
411    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
412    /// Returns `Err` if any hook URI cannot be resolved.
413    fn resolve_uow_layer(
414        &self,
415        config: &UnitOfWorkConfig,
416        producer_ctx: &ProducerContext,
417        registry: &Registry,
418        counter: Option<Arc<AtomicU64>>,
419    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
420        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
421            let parsed = parse_uri(uri)?;
422            let component = registry.get_or_err(&parsed.scheme)?;
423            let endpoint = component.create_endpoint(uri)?;
424            endpoint.create_producer(producer_ctx).map_err(|e| {
425                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
426            })
427        };
428
429        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
430        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
431
432        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
433        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
434        Ok((layer, counter))
435    }
436
437    fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
438        let guard = self
439            .languages
440            .lock()
441            .expect("mutex poisoned: another thread panicked while holding this lock");
442        guard.get(language).cloned().ok_or_else(|| {
443            CamelError::RouteError(format!(
444                "language `{language}` is not registered in CamelContext"
445            ))
446        })
447    }
448
449    fn compile_language_expression(
450        &self,
451        expression: &LanguageExpressionDef,
452    ) -> Result<Arc<dyn Expression>, CamelError> {
453        let language = self.resolve_language(&expression.language)?;
454        let compiled = language
455            .create_expression(&expression.source)
456            .map_err(|e| {
457                CamelError::RouteError(format!(
458                    "failed to compile {} expression `{}`: {e}",
459                    expression.language, expression.source
460                ))
461            })?;
462        Ok(Arc::from(compiled))
463    }
464
465    fn compile_language_predicate(
466        &self,
467        expression: &LanguageExpressionDef,
468    ) -> Result<Arc<dyn Predicate>, CamelError> {
469        let language = self.resolve_language(&expression.language)?;
470        let compiled = language.create_predicate(&expression.source).map_err(|e| {
471            CamelError::RouteError(format!(
472                "failed to compile {} predicate `{}`: {e}",
473                expression.language, expression.source
474            ))
475        })?;
476        Ok(Arc::from(compiled))
477    }
478
479    fn compile_filter_predicate(
480        &self,
481        expression: &LanguageExpressionDef,
482    ) -> Result<FilterPredicate, CamelError> {
483        let predicate = self.compile_language_predicate(expression)?;
484        Ok(Arc::new(move |exchange: &Exchange| {
485            predicate.matches(exchange).unwrap_or(false)
486        }))
487    }
488
489    fn value_to_body(value: Value) -> Body {
490        match value {
491            Value::Null => Body::Empty,
492            Value::String(text) => Body::Text(text),
493            other => Body::Json(other),
494        }
495    }
496
497    /// Resolve BuilderSteps into BoxProcessors.
498    pub(crate) fn resolve_steps(
499        &self,
500        steps: Vec<BuilderStep>,
501        producer_ctx: &ProducerContext,
502        registry: &Arc<std::sync::Mutex<Registry>>,
503    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
504        let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
505            let parsed = parse_uri(uri)?;
506            let registry_guard = registry
507                .lock()
508                .expect("mutex poisoned: another thread panicked while holding this lock");
509            let component = registry_guard.get_or_err(&parsed.scheme)?;
510            let endpoint = component.create_endpoint(uri)?;
511            endpoint.create_producer(producer_ctx)
512        };
513
514        let mut processors: Vec<(BoxProcessor, Option<camel_api::BodyType>)> = Vec::new();
515        for step in steps {
516            match step {
517                BuilderStep::Processor(svc) => {
518                    processors.push((svc, None));
519                }
520                BuilderStep::To(uri) => {
521                    let parsed = parse_uri(&uri)?;
522                    let registry_guard = registry
523                        .lock()
524                        .expect("mutex poisoned: another thread panicked while holding this lock");
525                    let component = registry_guard.get_or_err(&parsed.scheme)?;
526                    let endpoint = component.create_endpoint(&uri)?;
527                    let contract = endpoint.body_contract();
528                    let producer = endpoint.create_producer(producer_ctx)?;
529                    processors.push((producer, contract));
530                }
531                BuilderStep::Stop => {
532                    processors.push((BoxProcessor::new(camel_processor::StopService), None));
533                }
534                BuilderStep::Log { level, message } => {
535                    let svc = camel_processor::LogProcessor::new(level, message);
536                    processors.push((BoxProcessor::new(svc), None));
537                }
538                BuilderStep::DeclarativeSetHeader { key, value } => match value {
539                    ValueSourceDef::Literal(value) => {
540                        let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
541                        processors.push((BoxProcessor::new(svc), None));
542                    }
543                    ValueSourceDef::Expression(expression) => {
544                        let expression = self.compile_language_expression(&expression)?;
545                        let svc = camel_processor::DynamicSetHeader::new(
546                            IdentityProcessor,
547                            key,
548                            move |exchange: &Exchange| {
549                                expression.evaluate(exchange).unwrap_or(Value::Null)
550                            },
551                        );
552                        processors.push((BoxProcessor::new(svc), None));
553                    }
554                },
555                BuilderStep::DeclarativeSetBody { value } => match value {
556                    ValueSourceDef::Literal(value) => {
557                        let body = Self::value_to_body(value);
558                        let svc = camel_processor::SetBody::new(
559                            IdentityProcessor,
560                            move |_exchange: &Exchange| body.clone(),
561                        );
562                        processors.push((BoxProcessor::new(svc), None));
563                    }
564                    ValueSourceDef::Expression(expression) => {
565                        let expression = self.compile_language_expression(&expression)?;
566                        let svc = camel_processor::SetBody::new(
567                            IdentityProcessor,
568                            move |exchange: &Exchange| {
569                                let value = expression.evaluate(exchange).unwrap_or(Value::Null);
570                                Self::value_to_body(value)
571                            },
572                        );
573                        processors.push((BoxProcessor::new(svc), None));
574                    }
575                },
576                BuilderStep::DeclarativeFilter { predicate, steps } => {
577                    let predicate = self.compile_filter_predicate(&predicate)?;
578                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
579                    let sub_processors: Vec<BoxProcessor> =
580                        sub_pairs.into_iter().map(|(p, _)| p).collect();
581                    let sub_pipeline = compose_pipeline(sub_processors);
582                    let svc =
583                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
584                    processors.push((BoxProcessor::new(svc), None));
585                }
586                BuilderStep::DeclarativeChoice { whens, otherwise } => {
587                    let mut when_clauses = Vec::new();
588                    for when_step in whens {
589                        let predicate = self.compile_filter_predicate(&when_step.predicate)?;
590                        let sub_pairs =
591                            self.resolve_steps(when_step.steps, producer_ctx, registry)?;
592                        let sub_processors: Vec<BoxProcessor> =
593                            sub_pairs.into_iter().map(|(p, _)| p).collect();
594                        let pipeline = compose_pipeline(sub_processors);
595                        when_clauses.push(WhenClause {
596                            predicate,
597                            pipeline,
598                        });
599                    }
600                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
601                        let sub_pairs =
602                            self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
603                        let sub_processors: Vec<BoxProcessor> =
604                            sub_pairs.into_iter().map(|(p, _)| p).collect();
605                        Some(compose_pipeline(sub_processors))
606                    } else {
607                        None
608                    };
609                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
610                    processors.push((BoxProcessor::new(svc), None));
611                }
612                BuilderStep::DeclarativeScript { expression } => {
613                    let lang = self.resolve_language(&expression.language)?;
614                    match lang.create_mutating_expression(&expression.source) {
615                        Ok(mut_expr) => {
616                            processors
617                                .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
618                        }
619                        Err(LanguageError::NotSupported { .. }) => {
620                            // Graceful degradation: YAML declarative routes fall back to read-only
621                            // Expression → SetBody when the language doesn't support MutatingExpression.
622                            // This preserves backwards compatibility for languages like Simple that
623                            // only implement Expression. Contrast with the explicit .script() DSL step
624                            // which hard-errors on NotSupported (user opted in to mutation semantics).
625                            // TODO: add integration test asserting Simple language falls back to
626                            // read-only path (requires full CamelContext test harness).
627                            let expression = self.compile_language_expression(&expression)?;
628                            let svc = camel_processor::SetBody::new(
629                                IdentityProcessor,
630                                move |exchange: &Exchange| {
631                                    let value =
632                                        expression.evaluate(exchange).unwrap_or(Value::Null);
633                                    Self::value_to_body(value)
634                                },
635                            );
636                            processors.push((BoxProcessor::new(svc), None));
637                        }
638                        Err(e) => {
639                            return Err(CamelError::RouteError(format!(
640                                "Failed to create mutating expression for language '{}': {}",
641                                expression.language, e
642                            )));
643                        }
644                    }
645                }
646                BuilderStep::Split { config, steps } => {
647                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
648                    let sub_processors: Vec<BoxProcessor> =
649                        sub_pairs.into_iter().map(|(p, _)| p).collect();
650                    let sub_pipeline = compose_pipeline(sub_processors);
651                    let splitter =
652                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
653                    processors.push((BoxProcessor::new(splitter), None));
654                }
655                BuilderStep::DeclarativeSplit {
656                    expression,
657                    aggregation,
658                    parallel,
659                    parallel_limit,
660                    stop_on_exception,
661                    steps,
662                } => {
663                    let lang_expr = self.compile_language_expression(&expression)?;
664                    let split_fn = move |exchange: &Exchange| {
665                        let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
666                        match value {
667                            Value::String(s) => s
668                                .lines()
669                                .filter(|line| !line.is_empty())
670                                .map(|line| {
671                                    let mut fragment = exchange.clone();
672                                    fragment.input.body = Body::from(line.to_string());
673                                    fragment
674                                })
675                                .collect(),
676                            Value::Array(arr) => arr
677                                .into_iter()
678                                .map(|v| {
679                                    let mut fragment = exchange.clone();
680                                    fragment.input.body = Body::from(v);
681                                    fragment
682                                })
683                                .collect(),
684                            _ => vec![exchange.clone()],
685                        }
686                    };
687
688                    let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
689                        .aggregation(aggregation)
690                        .parallel(parallel)
691                        .stop_on_exception(stop_on_exception);
692                    if let Some(limit) = parallel_limit {
693                        config = config.parallel_limit(limit);
694                    }
695
696                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
697                    let sub_processors: Vec<BoxProcessor> =
698                        sub_pairs.into_iter().map(|(p, _)| p).collect();
699                    let sub_pipeline = compose_pipeline(sub_processors);
700                    let splitter =
701                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
702                    processors.push((BoxProcessor::new(splitter), None));
703                }
704                BuilderStep::Aggregate { config } => {
705                    let svc = camel_processor::AggregatorService::new(config);
706                    processors.push((BoxProcessor::new(svc), None));
707                }
708                BuilderStep::Filter { predicate, steps } => {
709                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
710                    let sub_processors: Vec<BoxProcessor> =
711                        sub_pairs.into_iter().map(|(p, _)| p).collect();
712                    let sub_pipeline = compose_pipeline(sub_processors);
713                    let svc =
714                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
715                    processors.push((BoxProcessor::new(svc), None));
716                }
717                BuilderStep::Choice { whens, otherwise } => {
718                    // Resolve each when clause's sub-steps into a pipeline.
719                    let mut when_clauses = Vec::new();
720                    for when_step in whens {
721                        let sub_pairs =
722                            self.resolve_steps(when_step.steps, producer_ctx, registry)?;
723                        let sub_processors: Vec<BoxProcessor> =
724                            sub_pairs.into_iter().map(|(p, _)| p).collect();
725                        let pipeline = compose_pipeline(sub_processors);
726                        when_clauses.push(WhenClause {
727                            predicate: when_step.predicate,
728                            pipeline,
729                        });
730                    }
731                    // Resolve otherwise branch (if present).
732                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
733                        let sub_pairs =
734                            self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
735                        let sub_processors: Vec<BoxProcessor> =
736                            sub_pairs.into_iter().map(|(p, _)| p).collect();
737                        Some(compose_pipeline(sub_processors))
738                    } else {
739                        None
740                    };
741                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
742                    processors.push((BoxProcessor::new(svc), None));
743                }
744                BuilderStep::WireTap { uri } => {
745                    let producer = resolve_producer(&uri)?;
746                    let svc = camel_processor::WireTapService::new(producer);
747                    processors.push((BoxProcessor::new(svc), None));
748                }
749                BuilderStep::Multicast { config, steps } => {
750                    // Each top-level step in the multicast scope becomes an independent endpoint.
751                    let mut endpoints = Vec::new();
752                    for step in steps {
753                        let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
754                        let sub_processors: Vec<BoxProcessor> =
755                            sub_pairs.into_iter().map(|(p, _)| p).collect();
756                        let endpoint = compose_pipeline(sub_processors);
757                        endpoints.push(endpoint);
758                    }
759                    let svc = camel_processor::MulticastService::new(endpoints, config);
760                    processors.push((BoxProcessor::new(svc), None));
761                }
762                BuilderStep::DeclarativeLog { level, message } => {
763                    let ValueSourceDef::Expression(expression) = message else {
764                        // Literal case is already converted to a Processor in compile.rs;
765                        // this arm should never be reached for literals.
766                        unreachable!(
767                            "DeclarativeLog with Literal should have been compiled to a Processor"
768                        );
769                    };
770                    let expression = self.compile_language_expression(&expression)?;
771                    let svc =
772                        camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
773                            expression
774                                .evaluate(exchange)
775                                .unwrap_or_else(|e| {
776                                    warn!(error = %e, "log expression evaluation failed");
777                                    Value::Null
778                                })
779                                .to_string()
780                        });
781                    processors.push((BoxProcessor::new(svc), None));
782                }
783                BuilderStep::Bean { name, method } => {
784                    // Lock beans registry to lookup bean
785                    let beans = self.beans.lock().expect(
786                        "beans mutex poisoned: another thread panicked while holding this lock",
787                    );
788
789                    // Lookup bean by name
790                    let bean = beans.get(&name).ok_or_else(|| {
791                        CamelError::ProcessorError(format!("Bean not found: {}", name))
792                    })?;
793
794                    // Clone Arc for async closure (release lock before async)
795                    let bean_clone = Arc::clone(&bean);
796                    let method = method.clone();
797
798                    // Create processor that invokes bean method
799                    let processor = tower::service_fn(move |mut exchange: Exchange| {
800                        let bean = Arc::clone(&bean_clone);
801                        let method = method.clone();
802
803                        async move {
804                            bean.call(&method, &mut exchange).await?;
805                            Ok(exchange)
806                        }
807                    });
808
809                    processors.push((BoxProcessor::new(processor), None));
810                }
811                BuilderStep::Script { language, script } => {
812                    let lang = self.resolve_language(&language)?;
813                    match lang.create_mutating_expression(&script) {
814                        Ok(mut_expr) => {
815                            processors
816                                .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
817                        }
818                        Err(LanguageError::NotSupported {
819                            feature,
820                            language: ref lang_name,
821                        }) => {
822                            // Hard error: the .script() DSL step explicitly requests mutation semantics.
823                            // If the language doesn't support MutatingExpression, the route is mis-configured.
824                            return Err(CamelError::RouteError(format!(
825                                "Language '{}' does not support {} (required for .script() step)",
826                                lang_name, feature
827                            )));
828                        }
829                        Err(e) => {
830                            return Err(CamelError::RouteError(format!(
831                                "Failed to create mutating expression for language '{}': {}",
832                                language, e
833                            )));
834                        }
835                    }
836                }
837                BuilderStep::Throttle { config, steps } => {
838                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
839                    let sub_processors: Vec<BoxProcessor> =
840                        sub_pairs.into_iter().map(|(p, _)| p).collect();
841                    let sub_pipeline = compose_pipeline(sub_processors);
842                    let svc =
843                        camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
844                    processors.push((BoxProcessor::new(svc), None));
845                }
846                BuilderStep::LoadBalance { config, steps } => {
847                    // Each top-level step in the load_balance scope becomes an independent endpoint.
848                    let mut endpoints = Vec::new();
849                    for step in steps {
850                        let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
851                        let sub_processors: Vec<BoxProcessor> =
852                            sub_pairs.into_iter().map(|(p, _)| p).collect();
853                        let endpoint = compose_pipeline(sub_processors);
854                        endpoints.push(endpoint);
855                    }
856                    let svc =
857                        camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
858                    processors.push((BoxProcessor::new(svc), None));
859                }
860                BuilderStep::DynamicRouter { config } => {
861                    use camel_api::EndpointResolver;
862
863                    let producer_ctx_clone = producer_ctx.clone();
864                    let registry_clone = Arc::clone(registry);
865                    let resolver: EndpointResolver = Arc::new(move |uri: &str| {
866                        let parsed = match parse_uri(uri) {
867                            Ok(p) => p,
868                            Err(_) => return None,
869                        };
870                        let registry_guard = match registry_clone.lock() {
871                            Ok(g) => g,
872                            Err(_) => return None, // mutex poisoned
873                        };
874                        let component = match registry_guard.get_or_err(&parsed.scheme) {
875                            Ok(c) => c,
876                            Err(_) => return None,
877                        };
878                        let endpoint = match component.create_endpoint(uri) {
879                            Ok(e) => e,
880                            Err(_) => return None,
881                        };
882                        let producer = match endpoint.create_producer(&producer_ctx_clone) {
883                            Ok(p) => p,
884                            Err(_) => return None,
885                        };
886                        Some(BoxProcessor::new(producer))
887                    });
888                    let svc = camel_processor::dynamic_router::DynamicRouterService::new(
889                        config, resolver,
890                    );
891                    processors.push((BoxProcessor::new(svc), None));
892                }
893                BuilderStep::RoutingSlip { config } => {
894                    use camel_api::EndpointResolver;
895
896                    let producer_ctx_clone = producer_ctx.clone();
897                    let registry_clone = registry.clone();
898                    let resolver: EndpointResolver = Arc::new(move |uri: &str| {
899                        let parsed = match parse_uri(uri) {
900                            Ok(p) => p,
901                            Err(_) => return None,
902                        };
903                        let registry_guard = match registry_clone.lock() {
904                            Ok(g) => g,
905                            Err(_) => return None,
906                        };
907                        let component = match registry_guard.get_or_err(&parsed.scheme) {
908                            Ok(c) => c,
909                            Err(_) => return None,
910                        };
911                        let endpoint = match component.create_endpoint(uri) {
912                            Ok(e) => e,
913                            Err(_) => return None,
914                        };
915                        let producer = match endpoint.create_producer(&producer_ctx_clone) {
916                            Ok(p) => p,
917                            Err(_) => return None,
918                        };
919                        Some(BoxProcessor::new(producer))
920                    });
921
922                    let svc =
923                        camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
924                    processors.push((BoxProcessor::new(svc), None));
925                }
926            }
927        }
928        Ok(processors)
929    }
930
931    /// Add a route definition to the controller.
932    ///
933    /// Steps are resolved immediately using the registry.
934    ///
935    /// # Errors
936    ///
937    /// Returns an error if:
938    /// - A route with the same ID already exists
939    /// - Step resolution fails
940    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
941        let route_id = definition.route_id().to_string();
942
943        if self.routes.contains_key(&route_id) {
944            return Err(CamelError::RouteError(format!(
945                "Route '{}' already exists",
946                route_id
947            )));
948        }
949
950        info!(route_id = %route_id, "Adding route to controller");
951
952        // Extract definition info for storage before steps are consumed
953        let definition_info = definition.to_info();
954        let from_uri = definition.from_uri.to_string();
955        let concurrency = definition.concurrency;
956
957        // Create ProducerContext from self_ref for step resolution
958        let producer_ctx = self.build_producer_context()?;
959
960        // Resolve steps into processors (takes ownership of steps)
961        let processors_with_contracts =
962            self.resolve_steps(definition.steps, &producer_ctx, &self.registry)?;
963        let route_id_for_tracing = route_id.clone();
964        let mut pipeline = compose_traced_pipeline_with_contracts(
965            processors_with_contracts,
966            &route_id_for_tracing,
967            self.tracing_enabled,
968            self.tracer_detail_level.clone(),
969            self.tracer_metrics.clone(),
970        );
971
972        // Apply circuit breaker if configured
973        if let Some(cb_config) = definition.circuit_breaker {
974            let cb_layer = CircuitBreakerLayer::new(cb_config);
975            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
976        }
977
978        // Determine which error handler config to use (per-route takes precedence)
979        let eh_config = definition
980            .error_handler
981            .or_else(|| self.global_error_handler.clone());
982
983        if let Some(config) = eh_config {
984            // Lock registry for error handler resolution
985            let registry = self
986                .registry
987                .lock()
988                .expect("mutex poisoned: another thread panicked while holding this lock");
989            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
990            pipeline = BoxProcessor::new(layer.layer(pipeline));
991        }
992
993        // Apply UoW layer outermost (after error handler)
994        let uow_counter = if let Some(uow_config) = &definition.unit_of_work {
995            let registry = self
996                .registry
997                .lock()
998                .expect("mutex poisoned: registry lock in add_route uow");
999            let (uow_layer, counter) =
1000                self.resolve_uow_layer(uow_config, &producer_ctx, &registry, None)?;
1001            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1002            Some(counter)
1003        } else {
1004            None
1005        };
1006
1007        self.routes.insert(
1008            route_id.clone(),
1009            ManagedRoute {
1010                definition: definition_info,
1011                from_uri,
1012                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
1013                concurrency,
1014                consumer_handle: None,
1015                pipeline_handle: None,
1016                consumer_cancel_token: CancellationToken::new(),
1017                pipeline_cancel_token: CancellationToken::new(),
1018                channel_sender: None,
1019                in_flight: uow_counter,
1020            },
1021        );
1022
1023        Ok(())
1024    }
1025
1026    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
1027    ///
1028    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
1029    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
1030    pub fn compile_route_definition(
1031        &self,
1032        def: RouteDefinition,
1033    ) -> Result<BoxProcessor, CamelError> {
1034        let route_id = def.route_id().to_string();
1035
1036        let producer_ctx = self.build_producer_context()?;
1037
1038        let processors_with_contracts =
1039            self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
1040        let mut pipeline = compose_traced_pipeline_with_contracts(
1041            processors_with_contracts,
1042            &route_id,
1043            self.tracing_enabled,
1044            self.tracer_detail_level.clone(),
1045            self.tracer_metrics.clone(),
1046        );
1047
1048        if let Some(cb_config) = def.circuit_breaker {
1049            let cb_layer = CircuitBreakerLayer::new(cb_config);
1050            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1051        }
1052
1053        let eh_config = def
1054            .error_handler
1055            .clone()
1056            .or_else(|| self.global_error_handler.clone());
1057        if let Some(config) = eh_config {
1058            // Lock registry for error handler resolution
1059            let registry = self
1060                .registry
1061                .lock()
1062                .expect("mutex poisoned: registry lock in compile_route_definition");
1063            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
1064            pipeline = BoxProcessor::new(layer.layer(pipeline));
1065        }
1066
1067        // Apply UoW layer outermost
1068        if let Some(uow_config) = &def.unit_of_work {
1069            let existing_counter = self
1070                .routes
1071                .get(&route_id)
1072                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1073
1074            let registry = self
1075                .registry
1076                .lock()
1077                .expect("mutex poisoned: registry lock in compile_route_definition uow");
1078
1079            let (uow_layer, _counter) =
1080                self.resolve_uow_layer(uow_config, &producer_ctx, &registry, existing_counter)?;
1081
1082            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1083        }
1084
1085        Ok(pipeline)
1086    }
1087
1088    /// Remove a route from the controller map.
1089    ///
1090    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
1091    /// Returns an error if the route is still running or does not exist.
1092    /// Does not cancel any running tasks — call `stop_route` first.
1093    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1094        let managed = self.routes.get(route_id).ok_or_else(|| {
1095            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1096        })?;
1097        if handle_is_running(&managed.consumer_handle)
1098            || handle_is_running(&managed.pipeline_handle)
1099        {
1100            return Err(CamelError::RouteError(format!(
1101                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1102                route_id,
1103                inferred_lifecycle_label(managed)
1104            )));
1105        }
1106        self.routes.remove(route_id);
1107        info!(route_id = %route_id, "Route removed from controller");
1108        Ok(())
1109    }
1110
1111    /// Returns the number of routes in the controller.
1112    pub fn route_count(&self) -> usize {
1113        self.routes.len()
1114    }
1115
1116    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1117        self.routes.get(route_id).map(|r| {
1118            r.in_flight
1119                .as_ref()
1120                .map_or(0, |c| c.load(Ordering::Relaxed))
1121        })
1122    }
1123
1124    /// Returns `true` if a route with the given ID exists.
1125    pub fn route_exists(&self, route_id: &str) -> bool {
1126        self.routes.contains_key(route_id)
1127    }
1128
1129    /// Returns all route IDs.
1130    pub fn route_ids(&self) -> Vec<String> {
1131        self.routes.keys().cloned().collect()
1132    }
1133
1134    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
1135    pub fn auto_startup_route_ids(&self) -> Vec<String> {
1136        let mut pairs: Vec<(String, i32)> = self
1137            .routes
1138            .iter()
1139            .filter(|(_, managed)| managed.definition.auto_startup())
1140            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1141            .collect();
1142        pairs.sort_by_key(|(_, order)| *order);
1143        pairs.into_iter().map(|(id, _)| id).collect()
1144    }
1145
1146    /// Returns route IDs sorted by shutdown order (startup order descending).
1147    pub fn shutdown_route_ids(&self) -> Vec<String> {
1148        let mut pairs: Vec<(String, i32)> = self
1149            .routes
1150            .iter()
1151            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1152            .collect();
1153        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1154        pairs.into_iter().map(|(id, _)| id).collect()
1155    }
1156
1157    /// Atomically swap the pipeline of a route.
1158    ///
1159    /// In-flight requests finish with the old pipeline (kept alive by Arc).
1160    /// New requests immediately use the new pipeline.
1161    pub fn swap_pipeline(
1162        &self,
1163        route_id: &str,
1164        new_pipeline: BoxProcessor,
1165    ) -> Result<(), CamelError> {
1166        let managed = self
1167            .routes
1168            .get(route_id)
1169            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1170
1171        managed
1172            .pipeline
1173            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1174        info!(route_id = %route_id, "Pipeline swapped atomically");
1175        Ok(())
1176    }
1177
1178    /// Returns the from_uri of a route, if it exists.
1179    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1180        self.routes.get(route_id).map(|r| r.from_uri.clone())
1181    }
1182
1183    /// Get a clone of the current pipeline for a route.
1184    ///
1185    /// This is useful for testing and introspection.
1186    /// Returns `None` if the route doesn't exist.
1187    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1188        self.routes
1189            .get(route_id)
1190            .map(|r| r.pipeline.load().0.clone())
1191    }
1192
1193    /// Internal stop implementation that can set custom status.
1194    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1195        let managed = self
1196            .routes
1197            .get_mut(route_id)
1198            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1199
1200        if !handle_is_running(&managed.consumer_handle)
1201            && !handle_is_running(&managed.pipeline_handle)
1202        {
1203            return Ok(());
1204        }
1205
1206        info!(route_id = %route_id, "Stopping route");
1207
1208        // Cancel both tokens to signal shutdown for consumer and pipeline independently
1209        let managed = self
1210            .routes
1211            .get_mut(route_id)
1212            .expect("invariant: route must exist after prior existence check");
1213        managed.consumer_cancel_token.cancel();
1214        managed.pipeline_cancel_token.cancel();
1215
1216        // Take handles directly (no Arc<Mutex> wrapper needed)
1217        let managed = self
1218            .routes
1219            .get_mut(route_id)
1220            .expect("invariant: route must exist after prior existence check");
1221        let consumer_handle = managed.consumer_handle.take();
1222        let pipeline_handle = managed.pipeline_handle.take();
1223
1224        // IMPORTANT: Drop channel_sender early so rx.recv() returns None
1225        // This ensures the pipeline task can exit even if idle on recv()
1226        let managed = self
1227            .routes
1228            .get_mut(route_id)
1229            .expect("invariant: route must exist after prior existence check");
1230        managed.channel_sender = None;
1231
1232        // Wait for tasks to complete with timeout
1233        // The CancellationToken already signaled tasks to stop gracefully.
1234        // Combined with the select! in pipeline loops, this should exit quickly.
1235        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1236            match (consumer_handle, pipeline_handle) {
1237                (Some(c), Some(p)) => {
1238                    let _ = tokio::join!(c, p);
1239                }
1240                (Some(c), None) => {
1241                    let _ = c.await;
1242                }
1243                (None, Some(p)) => {
1244                    let _ = p.await;
1245                }
1246                (None, None) => {}
1247            }
1248        })
1249        .await;
1250
1251        if timeout_result.is_err() {
1252            warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
1253        }
1254
1255        // Get the managed route again (can't hold across await)
1256        let managed = self
1257            .routes
1258            .get_mut(route_id)
1259            .expect("invariant: route must exist after prior existence check");
1260
1261        // Create fresh cancellation tokens for next start
1262        managed.consumer_cancel_token = CancellationToken::new();
1263        managed.pipeline_cancel_token = CancellationToken::new();
1264
1265        info!(route_id = %route_id, "Route stopped");
1266        Ok(())
1267    }
1268}
1269
1270#[async_trait::async_trait]
1271impl RouteController for DefaultRouteController {
1272    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1273        // Check if route exists and can be started.
1274        {
1275            let managed = self
1276                .routes
1277                .get_mut(route_id)
1278                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1279
1280            let consumer_running = handle_is_running(&managed.consumer_handle);
1281            let pipeline_running = handle_is_running(&managed.pipeline_handle);
1282            if consumer_running && pipeline_running {
1283                return Ok(());
1284            }
1285            if !consumer_running && pipeline_running {
1286                return Err(CamelError::RouteError(format!(
1287                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1288                    route_id
1289                )));
1290            }
1291            if consumer_running && !pipeline_running {
1292                return Err(CamelError::RouteError(format!(
1293                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1294                    route_id
1295                )));
1296            }
1297        }
1298
1299        info!(route_id = %route_id, "Starting route");
1300
1301        // Get the resolved route info
1302        let (from_uri, pipeline, concurrency) = {
1303            let managed = self
1304                .routes
1305                .get(route_id)
1306                .expect("invariant: route must exist after prior existence check");
1307            (
1308                managed.from_uri.clone(),
1309                Arc::clone(&managed.pipeline),
1310                managed.concurrency.clone(),
1311            )
1312        };
1313
1314        // Clone crash notifier for consumer task
1315        let crash_notifier = self.crash_notifier.clone();
1316        let runtime_for_consumer = self.runtime.clone();
1317
1318        // Parse from URI and create consumer (lock registry for lookup)
1319        let parsed = parse_uri(&from_uri)?;
1320        let registry = self
1321            .registry
1322            .lock()
1323            .expect("mutex poisoned: another thread panicked while holding this lock");
1324        let component = registry.get_or_err(&parsed.scheme)?;
1325        let endpoint = component.create_endpoint(&from_uri)?;
1326        let mut consumer = endpoint.create_consumer()?;
1327        let consumer_concurrency = consumer.concurrency_model();
1328        // Drop the lock before spawning tasks
1329        drop(registry);
1330
1331        // Resolve effective concurrency: route override > consumer default
1332        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1333
1334        // Get the managed route for mutation
1335        let managed = self
1336            .routes
1337            .get_mut(route_id)
1338            .expect("invariant: route must exist after prior existence check");
1339
1340        // Create channel for consumer to send exchanges
1341        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1342        // Create child tokens for independent lifecycle control
1343        let consumer_cancel = managed.consumer_cancel_token.child_token();
1344        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1345        // Clone sender for storage (to reuse on resume)
1346        let tx_for_storage = tx.clone();
1347        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1348
1349        // Start consumer in background task.
1350        let route_id_for_consumer = route_id.to_string();
1351        let consumer_handle = tokio::spawn(async move {
1352            if let Err(e) = consumer.start(consumer_ctx).await {
1353                error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1354                let error_msg = e.to_string();
1355
1356                // Send crash notification if notifier is configured
1357                if let Some(tx) = crash_notifier {
1358                    let _ = tx
1359                        .send(CrashNotification {
1360                            route_id: route_id_for_consumer.clone(),
1361                            error: error_msg.clone(),
1362                        })
1363                        .await;
1364                }
1365
1366                publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1367                    .await;
1368            }
1369        });
1370
1371        // Spawn pipeline task with its own cancellation token
1372        let pipeline_handle = match effective_concurrency {
1373            ConcurrencyModel::Sequential => {
1374                tokio::spawn(async move {
1375                    loop {
1376                        // Use select! to exit promptly on cancellation even when idle
1377                        let envelope = tokio::select! {
1378                            envelope = rx.recv() => match envelope {
1379                                Some(e) => e,
1380                                None => return, // Channel closed
1381                            },
1382                            _ = pipeline_cancel.cancelled() => {
1383                                // Cancellation requested - exit gracefully
1384                                return;
1385                            }
1386                        };
1387                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1388
1389                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1390                        let mut pipeline = pipeline.load().0.clone();
1391
1392                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1393                            if let Some(tx) = reply_tx {
1394                                let _ = tx.send(Err(e));
1395                            }
1396                            return;
1397                        }
1398
1399                        let result = pipeline.call(exchange).await;
1400                        if let Some(tx) = reply_tx {
1401                            let _ = tx.send(result);
1402                        } else if let Err(ref e) = result
1403                            && !matches!(e, CamelError::Stopped)
1404                        {
1405                            error!("Pipeline error: {e}");
1406                        }
1407                    }
1408                })
1409            }
1410            ConcurrencyModel::Concurrent { max } => {
1411                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1412                tokio::spawn(async move {
1413                    loop {
1414                        // Use select! to exit promptly on cancellation even when idle
1415                        let envelope = tokio::select! {
1416                            envelope = rx.recv() => match envelope {
1417                                Some(e) => e,
1418                                None => return, // Channel closed
1419                            },
1420                            _ = pipeline_cancel.cancelled() => {
1421                                // Cancellation requested - exit gracefully
1422                                return;
1423                            }
1424                        };
1425                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1426                        let pipe_ref = Arc::clone(&pipeline);
1427                        let sem = sem.clone();
1428                        let cancel = pipeline_cancel.clone();
1429                        tokio::spawn(async move {
1430                            // Acquire semaphore permit if bounded
1431                            let _permit = match &sem {
1432                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1433                                None => None,
1434                            };
1435
1436                            // Load current pipeline from ArcSwap
1437                            let mut pipe = pipe_ref.load().0.clone();
1438
1439                            // Wait for service ready with circuit breaker backoff
1440                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1441                                if let Some(tx) = reply_tx {
1442                                    let _ = tx.send(Err(e));
1443                                }
1444                                return;
1445                            }
1446
1447                            let result = pipe.call(exchange).await;
1448                            if let Some(tx) = reply_tx {
1449                                let _ = tx.send(result);
1450                            } else if let Err(ref e) = result
1451                                && !matches!(e, CamelError::Stopped)
1452                            {
1453                                error!("Pipeline error: {e}");
1454                            }
1455                        });
1456                    }
1457                })
1458            }
1459        };
1460
1461        // Store handles and update status
1462        let managed = self
1463            .routes
1464            .get_mut(route_id)
1465            .expect("invariant: route must exist after prior existence check");
1466        managed.consumer_handle = Some(consumer_handle);
1467        managed.pipeline_handle = Some(pipeline_handle);
1468        managed.channel_sender = Some(tx_for_storage);
1469
1470        info!(route_id = %route_id, "Route started");
1471        Ok(())
1472    }
1473
1474    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1475        self.stop_route_internal(route_id).await
1476    }
1477
1478    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1479        self.stop_route(route_id).await?;
1480        tokio::time::sleep(Duration::from_millis(100)).await;
1481        self.start_route(route_id).await
1482    }
1483
1484    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1485        // Check route exists and state.
1486        let managed = self
1487            .routes
1488            .get_mut(route_id)
1489            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1490
1491        let consumer_running = handle_is_running(&managed.consumer_handle);
1492        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1493
1494        // Can only suspend from active started state.
1495        if !consumer_running || !pipeline_running {
1496            return Err(CamelError::RouteError(format!(
1497                "Cannot suspend route '{}' with execution lifecycle {}",
1498                route_id,
1499                inferred_lifecycle_label(managed)
1500            )));
1501        }
1502
1503        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1504
1505        // Cancel consumer token only (keep pipeline running)
1506        let managed = self
1507            .routes
1508            .get_mut(route_id)
1509            .expect("invariant: route must exist after prior existence check");
1510        managed.consumer_cancel_token.cancel();
1511
1512        // Take and join consumer handle
1513        let managed = self
1514            .routes
1515            .get_mut(route_id)
1516            .expect("invariant: route must exist after prior existence check");
1517        let consumer_handle = managed.consumer_handle.take();
1518
1519        // Wait for consumer task to complete with timeout
1520        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1521            if let Some(handle) = consumer_handle {
1522                let _ = handle.await;
1523            }
1524        })
1525        .await;
1526
1527        if timeout_result.is_err() {
1528            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1529        }
1530
1531        // Get the managed route again (can't hold across await)
1532        let managed = self
1533            .routes
1534            .get_mut(route_id)
1535            .expect("invariant: route must exist after prior existence check");
1536
1537        // Create fresh cancellation token for consumer (for resume)
1538        managed.consumer_cancel_token = CancellationToken::new();
1539
1540        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1541        Ok(())
1542    }
1543
1544    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1545        // Check route exists and is Suspended-equivalent execution state.
1546        let managed = self
1547            .routes
1548            .get(route_id)
1549            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1550
1551        let consumer_running = handle_is_running(&managed.consumer_handle);
1552        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1553        if consumer_running || !pipeline_running {
1554            return Err(CamelError::RouteError(format!(
1555                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1556                route_id,
1557                inferred_lifecycle_label(managed)
1558            )));
1559        }
1560
1561        // Get the stored channel sender (must exist for a suspended route)
1562        let sender = managed.channel_sender.clone().ok_or_else(|| {
1563            CamelError::RouteError("Suspended route has no channel sender".into())
1564        })?;
1565
1566        // Get from_uri and concurrency for creating new consumer
1567        let from_uri = managed.from_uri.clone();
1568
1569        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1570
1571        // Parse from URI and create consumer (lock registry for lookup)
1572        let parsed = parse_uri(&from_uri)?;
1573        let registry = self
1574            .registry
1575            .lock()
1576            .expect("mutex poisoned: another thread panicked while holding this lock");
1577        let component = registry.get_or_err(&parsed.scheme)?;
1578        let endpoint = component.create_endpoint(&from_uri)?;
1579        let mut consumer = endpoint.create_consumer()?;
1580        // Drop the lock before spawning tasks
1581        drop(registry);
1582
1583        // Get the managed route for mutation
1584        let managed = self
1585            .routes
1586            .get_mut(route_id)
1587            .expect("invariant: route must exist after prior existence check");
1588
1589        // Create child token for consumer lifecycle
1590        let consumer_cancel = managed.consumer_cancel_token.child_token();
1591
1592        let crash_notifier = self.crash_notifier.clone();
1593        let runtime_for_consumer = self.runtime.clone();
1594
1595        // Create ConsumerContext with the stored sender
1596        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1597
1598        // Spawn consumer task
1599        let route_id_for_consumer = route_id.to_string();
1600        let consumer_handle = tokio::spawn(async move {
1601            if let Err(e) = consumer.start(consumer_ctx).await {
1602                error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1603                let error_msg = e.to_string();
1604
1605                // Send crash notification if notifier is configured
1606                if let Some(tx) = crash_notifier {
1607                    let _ = tx
1608                        .send(CrashNotification {
1609                            route_id: route_id_for_consumer.clone(),
1610                            error: error_msg.clone(),
1611                        })
1612                        .await;
1613                }
1614
1615                publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1616                    .await;
1617            }
1618        });
1619
1620        // Store consumer handle and update status
1621        let managed = self
1622            .routes
1623            .get_mut(route_id)
1624            .expect("invariant: route must exist after prior existence check");
1625        managed.consumer_handle = Some(consumer_handle);
1626
1627        info!(route_id = %route_id, "Route resumed");
1628        Ok(())
1629    }
1630
1631    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1632        // Only start routes where auto_startup() == true
1633        // Sort by startup_order() ascending before starting
1634        let route_ids: Vec<String> = {
1635            let mut pairs: Vec<_> = self
1636                .routes
1637                .iter()
1638                .filter(|(_, r)| r.definition.auto_startup())
1639                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1640                .collect();
1641            pairs.sort_by_key(|(_, order)| *order);
1642            pairs.into_iter().map(|(id, _)| id).collect()
1643        };
1644
1645        info!("Starting {} auto-startup routes", route_ids.len());
1646
1647        // Collect errors but continue starting remaining routes
1648        let mut errors: Vec<String> = Vec::new();
1649        for route_id in route_ids {
1650            if let Err(e) = self.start_route(&route_id).await {
1651                errors.push(format!("Route '{}': {}", route_id, e));
1652            }
1653        }
1654
1655        if !errors.is_empty() {
1656            return Err(CamelError::RouteError(format!(
1657                "Failed to start routes: {}",
1658                errors.join(", ")
1659            )));
1660        }
1661
1662        info!("All auto-startup routes started");
1663        Ok(())
1664    }
1665
1666    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1667        // Sort by startup_order descending (reverse order)
1668        let route_ids: Vec<String> = {
1669            let mut pairs: Vec<_> = self
1670                .routes
1671                .iter()
1672                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1673                .collect();
1674            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1675            pairs.into_iter().map(|(id, _)| id).collect()
1676        };
1677
1678        info!("Stopping {} routes", route_ids.len());
1679
1680        for route_id in route_ids {
1681            let _ = self.stop_route(&route_id).await;
1682        }
1683
1684        info!("All routes stopped");
1685        Ok(())
1686    }
1687}
1688
1689#[async_trait::async_trait]
1690impl RouteControllerInternal for DefaultRouteController {
1691    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
1692        DefaultRouteController::add_route(self, def)
1693    }
1694
1695    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
1696        DefaultRouteController::swap_pipeline(self, route_id, pipeline)
1697    }
1698
1699    fn route_from_uri(&self, route_id: &str) -> Option<String> {
1700        // Call the inherent method which now returns Option<String>
1701        DefaultRouteController::route_from_uri(self, route_id)
1702    }
1703
1704    fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
1705        DefaultRouteController::set_error_handler(self, config)
1706    }
1707
1708    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
1709        DefaultRouteController::set_self_ref(self, self_ref)
1710    }
1711
1712    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
1713        DefaultRouteController::set_runtime_handle(self, runtime)
1714    }
1715
1716    fn route_count(&self) -> usize {
1717        DefaultRouteController::route_count(self)
1718    }
1719
1720    fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1721        DefaultRouteController::in_flight_count(self, route_id)
1722    }
1723
1724    fn route_exists(&self, route_id: &str) -> bool {
1725        DefaultRouteController::route_exists(self, route_id)
1726    }
1727
1728    fn route_ids(&self) -> Vec<String> {
1729        DefaultRouteController::route_ids(self)
1730    }
1731
1732    fn auto_startup_route_ids(&self) -> Vec<String> {
1733        DefaultRouteController::auto_startup_route_ids(self)
1734    }
1735
1736    fn shutdown_route_ids(&self) -> Vec<String> {
1737        DefaultRouteController::shutdown_route_ids(self)
1738    }
1739
1740    fn set_tracer_config(&mut self, config: &TracerConfig) {
1741        DefaultRouteController::set_tracer_config(self, config)
1742    }
1743
1744    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
1745        DefaultRouteController::compile_route_definition(self, def)
1746    }
1747
1748    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1749        DefaultRouteController::remove_route(self, route_id)
1750    }
1751
1752    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1753        DefaultRouteController::start_route(self, route_id).await
1754    }
1755
1756    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1757        DefaultRouteController::stop_route(self, route_id).await
1758    }
1759}
1760
1761#[cfg(test)]
1762mod tests {
1763    use super::*;
1764    use crate::shared::components::domain::Registry;
1765
1766    fn build_controller() -> DefaultRouteController {
1767        DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())))
1768    }
1769
1770    fn build_controller_with_components() -> DefaultRouteController {
1771        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1772        {
1773            let mut guard = registry.lock().expect("registry lock");
1774            guard.register(camel_component_timer::TimerComponent::new());
1775            guard.register(camel_component_mock::MockComponent::new());
1776            guard.register(camel_component_log::LogComponent::new());
1777        }
1778        DefaultRouteController::new(registry)
1779    }
1780
1781    fn set_self_ref(controller: &mut DefaultRouteController) {
1782        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1783        let other: Arc<Mutex<dyn RouteController>> =
1784            Arc::new(Mutex::new(DefaultRouteController::new(registry)));
1785        controller.set_self_ref(other);
1786    }
1787
1788    fn register_simple_language(controller: &mut DefaultRouteController) {
1789        controller.languages.lock().expect("languages lock").insert(
1790            "simple".into(),
1791            Arc::new(camel_language_simple::SimpleLanguage),
1792        );
1793    }
1794
1795    #[test]
1796    fn test_route_controller_internal_is_object_safe() {
1797        let _: Option<Box<dyn RouteControllerInternal>> = None;
1798    }
1799
1800    #[test]
1801    fn helper_functions_cover_non_async_branches() {
1802        let managed = ManagedRoute {
1803            definition: RouteDefinition::new("timer:a", vec![])
1804                .with_route_id("r")
1805                .to_info(),
1806            from_uri: "timer:a".into(),
1807            pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(BoxProcessor::new(
1808                IdentityProcessor,
1809            )))),
1810            concurrency: None,
1811            consumer_handle: None,
1812            pipeline_handle: None,
1813            consumer_cancel_token: CancellationToken::new(),
1814            pipeline_cancel_token: CancellationToken::new(),
1815            channel_sender: None,
1816            in_flight: None,
1817        };
1818
1819        assert_eq!(inferred_lifecycle_label(&managed), "Stopped");
1820        assert!(!handle_is_running(&managed.consumer_handle));
1821
1822        let cmd = runtime_failure_command("route-x", "boom");
1823        match cmd {
1824            RuntimeCommand::FailRoute {
1825                route_id, error, ..
1826            } => {
1827                assert_eq!(route_id, "route-x");
1828                assert_eq!(error, "boom");
1829            }
1830            _ => panic!("expected FailRoute command"),
1831        }
1832    }
1833
1834    #[test]
1835    fn add_route_detects_duplicates() {
1836        let mut controller = build_controller();
1837        set_self_ref(&mut controller);
1838
1839        controller
1840            .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
1841            .expect("add route");
1842
1843        let dup_err = controller
1844            .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
1845            .expect_err("duplicate must fail");
1846        assert!(dup_err.to_string().contains("already exists"));
1847    }
1848
1849    #[test]
1850    fn route_introspection_and_ordering_helpers_work() {
1851        let mut controller = build_controller();
1852        set_self_ref(&mut controller);
1853
1854        controller
1855            .add_route(
1856                RouteDefinition::new("timer:a", vec![])
1857                    .with_route_id("a")
1858                    .with_startup_order(20),
1859            )
1860            .unwrap();
1861        controller
1862            .add_route(
1863                RouteDefinition::new("timer:b", vec![])
1864                    .with_route_id("b")
1865                    .with_startup_order(10),
1866            )
1867            .unwrap();
1868        controller
1869            .add_route(
1870                RouteDefinition::new("timer:c", vec![])
1871                    .with_route_id("c")
1872                    .with_auto_startup(false)
1873                    .with_startup_order(5),
1874            )
1875            .unwrap();
1876
1877        assert_eq!(controller.route_count(), 3);
1878        assert_eq!(controller.route_from_uri("a"), Some("timer:a".into()));
1879        assert!(controller.route_ids().contains(&"a".to_string()));
1880        assert_eq!(
1881            controller.auto_startup_route_ids(),
1882            vec!["b".to_string(), "a".to_string()]
1883        );
1884        assert_eq!(
1885            controller.shutdown_route_ids(),
1886            vec!["a".to_string(), "b".to_string(), "c".to_string()]
1887        );
1888    }
1889
1890    #[test]
1891    fn swap_pipeline_and_remove_route_behaviors() {
1892        let mut controller = build_controller();
1893        set_self_ref(&mut controller);
1894
1895        controller
1896            .add_route(RouteDefinition::new("timer:a", vec![]).with_route_id("swap"))
1897            .unwrap();
1898
1899        controller
1900            .swap_pipeline("swap", BoxProcessor::new(IdentityProcessor))
1901            .unwrap();
1902        assert!(controller.get_pipeline("swap").is_some());
1903
1904        controller.remove_route("swap").unwrap();
1905        assert_eq!(controller.route_count(), 0);
1906
1907        let err = controller
1908            .remove_route("swap")
1909            .expect_err("missing route must fail");
1910        assert!(err.to_string().contains("not found"));
1911    }
1912
1913    #[test]
1914    fn resolve_steps_covers_declarative_and_eip_variants() {
1915        use camel_api::LanguageExpressionDef;
1916        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1917
1918        let mut controller = build_controller_with_components();
1919        set_self_ref(&mut controller);
1920        register_simple_language(&mut controller);
1921
1922        let expr = |source: &str| LanguageExpressionDef {
1923            language: "simple".into(),
1924            source: source.into(),
1925        };
1926
1927        let steps = vec![
1928            BuilderStep::To("mock:out".into()),
1929            BuilderStep::Stop,
1930            BuilderStep::Log {
1931                level: camel_processor::LogLevel::Info,
1932                message: "log".into(),
1933            },
1934            BuilderStep::DeclarativeSetHeader {
1935                key: "k".into(),
1936                value: ValueSourceDef::Literal(Value::String("v".into())),
1937            },
1938            BuilderStep::DeclarativeSetHeader {
1939                key: "k2".into(),
1940                value: ValueSourceDef::Expression(expr("${body}")),
1941            },
1942            BuilderStep::DeclarativeSetBody {
1943                value: ValueSourceDef::Expression(expr("${body}")),
1944            },
1945            BuilderStep::DeclarativeFilter {
1946                predicate: expr("${body} != null"),
1947                steps: vec![BuilderStep::Stop],
1948            },
1949            BuilderStep::DeclarativeChoice {
1950                whens: vec![
1951                    crate::lifecycle::application::route_definition::DeclarativeWhenStep {
1952                        predicate: expr("${body} == 'x'"),
1953                        steps: vec![BuilderStep::Stop],
1954                    },
1955                ],
1956                otherwise: Some(vec![BuilderStep::Stop]),
1957            },
1958            BuilderStep::DeclarativeScript {
1959                expression: expr("${body}"),
1960            },
1961            BuilderStep::Split {
1962                config: SplitterConfig::new(split_body_lines())
1963                    .aggregation(AggregationStrategy::CollectAll),
1964                steps: vec![BuilderStep::Stop],
1965            },
1966            BuilderStep::DeclarativeSplit {
1967                expression: expr("${body}"),
1968                aggregation: AggregationStrategy::Original,
1969                parallel: false,
1970                parallel_limit: Some(2),
1971                stop_on_exception: true,
1972                steps: vec![BuilderStep::Stop],
1973            },
1974            BuilderStep::Aggregate {
1975                config: camel_api::AggregatorConfig::correlate_by("id")
1976                    .complete_when_size(1)
1977                    .build(),
1978            },
1979            BuilderStep::Filter {
1980                predicate: Arc::new(|_| true),
1981                steps: vec![BuilderStep::Stop],
1982            },
1983            BuilderStep::Choice {
1984                whens: vec![crate::lifecycle::application::route_definition::WhenStep {
1985                    predicate: Arc::new(|_| true),
1986                    steps: vec![BuilderStep::Stop],
1987                }],
1988                otherwise: Some(vec![BuilderStep::Stop]),
1989            },
1990            BuilderStep::WireTap {
1991                uri: "mock:tap".into(),
1992            },
1993            BuilderStep::Multicast {
1994                steps: vec![
1995                    BuilderStep::To("mock:m1".into()),
1996                    BuilderStep::To("mock:m2".into()),
1997                ],
1998                config: camel_api::MulticastConfig::new(),
1999            },
2000            BuilderStep::DeclarativeLog {
2001                level: camel_processor::LogLevel::Info,
2002                message: ValueSourceDef::Expression(expr("${body}")),
2003            },
2004            BuilderStep::Throttle {
2005                config: camel_api::ThrottlerConfig::new(10, Duration::from_millis(100)),
2006                steps: vec![BuilderStep::To("mock:t".into())],
2007            },
2008            BuilderStep::LoadBalance {
2009                config: camel_api::LoadBalancerConfig::round_robin(),
2010                steps: vec![
2011                    BuilderStep::To("mock:l1".into()),
2012                    BuilderStep::To("mock:l2".into()),
2013                ],
2014            },
2015            BuilderStep::DynamicRouter {
2016                config: camel_api::DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
2017            },
2018            BuilderStep::RoutingSlip {
2019                config: camel_api::RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
2020            },
2021        ];
2022
2023        let producer_ctx = ProducerContext::new();
2024        let resolved = controller
2025            .resolve_steps(steps, &producer_ctx, &controller.registry)
2026            .expect("resolve should succeed");
2027        assert!(!resolved.is_empty());
2028    }
2029
2030    #[test]
2031    fn resolve_steps_script_requires_mutating_language_support() {
2032        use camel_api::LanguageExpressionDef;
2033
2034        let mut controller = build_controller_with_components();
2035        set_self_ref(&mut controller);
2036        register_simple_language(&mut controller);
2037
2038        let steps = vec![BuilderStep::Script {
2039            language: "simple".into(),
2040            script: "${body}".into(),
2041        }];
2042
2043        let err = controller
2044            .resolve_steps(steps, &ProducerContext::new(), &controller.registry)
2045            .expect_err("simple script should fail for mutating expression");
2046        assert!(err.to_string().contains("does not support"));
2047
2048        let bean_missing = vec![BuilderStep::Bean {
2049            name: "unknown".into(),
2050            method: "run".into(),
2051        }];
2052        let bean_err = controller
2053            .resolve_steps(bean_missing, &ProducerContext::new(), &controller.registry)
2054            .expect_err("missing bean must fail");
2055        assert!(bean_err.to_string().contains("Bean not found"));
2056
2057        let bad_declarative = vec![BuilderStep::DeclarativeScript {
2058            expression: LanguageExpressionDef {
2059                language: "unknown".into(),
2060                source: "x".into(),
2061            },
2062        }];
2063        let lang_err = controller
2064            .resolve_steps(
2065                bad_declarative,
2066                &ProducerContext::new(),
2067                &controller.registry,
2068            )
2069            .expect_err("unknown language must fail");
2070        assert!(lang_err.to_string().contains("not registered"));
2071    }
2072
2073    #[tokio::test]
2074    async fn lifecycle_methods_report_missing_routes() {
2075        let mut controller = build_controller();
2076
2077        assert!(controller.start_route("missing").await.is_err());
2078        assert!(controller.stop_route("missing").await.is_err());
2079        assert!(controller.suspend_route("missing").await.is_err());
2080        assert!(controller.resume_route("missing").await.is_err());
2081    }
2082
2083    #[tokio::test]
2084    async fn start_stop_route_happy_path_with_timer_and_mock() {
2085        let mut controller = build_controller_with_components();
2086        set_self_ref(&mut controller);
2087
2088        let route = RouteDefinition::new(
2089            "timer:tick?period=10&repeatCount=1",
2090            vec![BuilderStep::To("mock:out".into())],
2091        )
2092        .with_route_id("rt-1");
2093        controller.add_route(route).unwrap();
2094
2095        controller.start_route("rt-1").await.unwrap();
2096        tokio::time::sleep(Duration::from_millis(40)).await;
2097        controller.stop_route("rt-1").await.unwrap();
2098
2099        controller.remove_route("rt-1").unwrap();
2100    }
2101
2102    #[tokio::test]
2103    async fn suspend_resume_and_restart_cover_execution_transitions() {
2104        let mut controller = build_controller_with_components();
2105        set_self_ref(&mut controller);
2106
2107        let route = RouteDefinition::new(
2108            "timer:tick?period=30",
2109            vec![BuilderStep::To("mock:out".into())],
2110        )
2111        .with_route_id("rt-2");
2112        controller.add_route(route).unwrap();
2113
2114        controller.start_route("rt-2").await.unwrap();
2115        controller.suspend_route("rt-2").await.unwrap();
2116        controller.resume_route("rt-2").await.unwrap();
2117        controller.restart_route("rt-2").await.unwrap();
2118        controller.stop_route("rt-2").await.unwrap();
2119    }
2120
2121    #[tokio::test]
2122    async fn remove_route_rejects_running_route() {
2123        let mut controller = build_controller_with_components();
2124        set_self_ref(&mut controller);
2125
2126        let route = RouteDefinition::new(
2127            "timer:tick?period=25",
2128            vec![BuilderStep::To("mock:out".into())],
2129        )
2130        .with_route_id("rt-running");
2131        controller.add_route(route).unwrap();
2132        controller.start_route("rt-running").await.unwrap();
2133
2134        let err = controller
2135            .remove_route("rt-running")
2136            .expect_err("running route removal must fail");
2137        assert!(err.to_string().contains("must be stopped before removal"));
2138
2139        controller.stop_route("rt-running").await.unwrap();
2140        controller.remove_route("rt-running").unwrap();
2141    }
2142
2143    #[tokio::test]
2144    async fn start_route_on_suspended_state_returns_guidance_error() {
2145        let mut controller = build_controller_with_components();
2146        set_self_ref(&mut controller);
2147
2148        let route = RouteDefinition::new(
2149            "timer:tick?period=40",
2150            vec![BuilderStep::To("mock:out".into())],
2151        )
2152        .with_route_id("rt-suspend");
2153        controller.add_route(route).unwrap();
2154
2155        controller.start_route("rt-suspend").await.unwrap();
2156        controller.suspend_route("rt-suspend").await.unwrap();
2157
2158        let err = controller
2159            .start_route("rt-suspend")
2160            .await
2161            .expect_err("start from suspended must fail");
2162        assert!(err.to_string().contains("use resume_route"));
2163
2164        controller.resume_route("rt-suspend").await.unwrap();
2165        controller.stop_route("rt-suspend").await.unwrap();
2166    }
2167
2168    #[tokio::test]
2169    async fn suspend_and_resume_validate_execution_state() {
2170        let mut controller = build_controller_with_components();
2171        set_self_ref(&mut controller);
2172
2173        controller
2174            .add_route(
2175                RouteDefinition::new("timer:tick?period=50", vec![]).with_route_id("rt-state"),
2176            )
2177            .unwrap();
2178
2179        let suspend_err = controller
2180            .suspend_route("rt-state")
2181            .await
2182            .expect_err("suspend before start must fail");
2183        assert!(suspend_err.to_string().contains("Cannot suspend route"));
2184
2185        controller.start_route("rt-state").await.unwrap();
2186        let resume_err = controller
2187            .resume_route("rt-state")
2188            .await
2189            .expect_err("resume while started must fail");
2190        assert!(resume_err.to_string().contains("Cannot resume route"));
2191
2192        controller.stop_route("rt-state").await.unwrap();
2193    }
2194
2195    #[tokio::test]
2196    async fn concurrent_concurrency_override_path_executes() {
2197        let mut controller = build_controller_with_components();
2198        set_self_ref(&mut controller);
2199
2200        let route = RouteDefinition::new(
2201            "timer:tick?period=10&repeatCount=2",
2202            vec![BuilderStep::To("mock:out".into())],
2203        )
2204        .with_route_id("rt-concurrent")
2205        .with_concurrency(ConcurrencyModel::Concurrent { max: Some(2) });
2206
2207        controller.add_route(route).unwrap();
2208        controller.start_route("rt-concurrent").await.unwrap();
2209        tokio::time::sleep(Duration::from_millis(50)).await;
2210        controller.stop_route("rt-concurrent").await.unwrap();
2211    }
2212
2213    #[tokio::test]
2214    async fn add_route_with_circuit_breaker_and_error_handler_compiles() {
2215        use camel_api::circuit_breaker::CircuitBreakerConfig;
2216        use camel_api::error_handler::ErrorHandlerConfig;
2217
2218        let mut controller = build_controller_with_components();
2219        set_self_ref(&mut controller);
2220
2221        let route = RouteDefinition::new("timer:tick?period=25", vec![BuilderStep::Stop])
2222            .with_route_id("rt-eh")
2223            .with_circuit_breaker(CircuitBreakerConfig::new())
2224            .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"));
2225
2226        controller
2227            .add_route(route)
2228            .expect("route with layers should compile");
2229        controller.start_route("rt-eh").await.unwrap();
2230        controller.stop_route("rt-eh").await.unwrap();
2231    }
2232
2233    #[tokio::test]
2234    async fn compile_and_swap_errors_for_missing_route() {
2235        let mut controller = build_controller_with_components();
2236        set_self_ref(&mut controller);
2237
2238        let compiled = controller
2239            .compile_route_definition(
2240                RouteDefinition::new("timer:tick?period=10", vec![BuilderStep::Stop])
2241                    .with_route_id("compiled"),
2242            )
2243            .expect("compile should work");
2244
2245        let err = controller
2246            .swap_pipeline("nope", compiled)
2247            .expect_err("missing route swap must fail");
2248        assert!(err.to_string().contains("not found"));
2249    }
2250}