Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21    BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
22    RouteController, RuntimeCommand, RuntimeHandle, Value, body::Body,
23};
24use camel_component_api::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
25use camel_endpoint::parse_uri;
26use camel_language_api::{Expression, Language, LanguageError, Predicate};
27pub use camel_processor::aggregator::SharedLanguageRegistry;
28use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
29use camel_processor::circuit_breaker::CircuitBreakerLayer;
30use camel_processor::error_handler::ErrorHandlerLayer;
31use camel_processor::script_mutator::ScriptMutator;
32use camel_processor::{ChoiceService, WhenClause};
33
34use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
35use crate::lifecycle::adapters::route_compiler::{
36    compose_pipeline, compose_traced_pipeline_with_contracts,
37};
38use crate::lifecycle::application::route_definition::{
39    BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
40};
41use crate::shared::components::domain::Registry;
42use crate::shared::observability::domain::{DetailLevel, TracerConfig};
43use arc_swap::ArcSwap;
44use camel_bean::BeanRegistry;
45
46/// Notification sent when a route crashes.
47///
48/// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
49/// to monitor and restart failed routes.
50#[derive(Debug, Clone)]
51pub struct CrashNotification {
52    /// The ID of the crashed route.
53    pub route_id: String,
54    /// The error that caused the crash.
55    pub error: String,
56}
57
58/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
59///
60/// # Safety
61///
62/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
63/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
64///
65/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
66///    (creates a new boxed service from the inner clone).
67/// 2. The clone is owned by the calling thread and never shared.
68/// 3. ArcSwap guarantees we only get & references (no &mut).
69///
70/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
71/// clone() does not mutate shared state and each thread gets an independent copy.
72pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
73unsafe impl Sync for SyncBoxProcessor {}
74
75type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
76
77/// Internal trait extending [`RouteController`] with methods needed by [`CamelContext`]
78/// that are not part of the public lifecycle API.
79///
80/// Both [`DefaultRouteController`] and the future `SupervisingRouteController` implement
81/// this trait, allowing `CamelContext` to hold either as `Arc<Mutex<dyn RouteControllerInternal>>`.
82#[async_trait::async_trait]
83pub trait RouteControllerInternal: RouteController + Send {
84    /// Add a route definition to the controller.
85    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
86
87    /// Atomically swap the pipeline of a running route (for hot-reload).
88    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
89
90    /// Returns the `from_uri` of a route by ID.
91    fn route_from_uri(&self, route_id: &str) -> Option<String>;
92
93    /// Set a global error handler applied to all routes.
94    fn set_error_handler(&mut self, config: ErrorHandlerConfig);
95
96    /// Set the self-reference needed to create `ProducerContext`.
97    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
98
99    /// Set runtime handle for ProducerContext command/query access.
100    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>);
101
102    /// Returns the number of routes in the controller.
103    fn route_count(&self) -> usize;
104
105    /// Returns the current in-flight count for a route, or `None` if route not found.
106    fn in_flight_count(&self, route_id: &str) -> Option<u64>;
107
108    /// Returns `true` if a route with the given ID exists.
109    fn route_exists(&self, route_id: &str) -> bool;
110
111    /// Returns all route IDs.
112    fn route_ids(&self) -> Vec<String>;
113
114    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
115    fn auto_startup_route_ids(&self) -> Vec<String>;
116
117    /// Returns route IDs sorted by shutdown order (startup order descending).
118    fn shutdown_route_ids(&self) -> Vec<String>;
119
120    /// Configure tracing from a [`TracerConfig`].
121    fn set_tracer_config(&mut self, config: &TracerConfig);
122
123    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting it into the route map.
124    /// Used by hot-reload to prepare a new pipeline for atomic swap.
125    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
126
127    /// Remove a route from the controller map (route must be stopped first).
128    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
129
130    /// Start a route by ID (for use by hot-reload, where async_trait is required).
131    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
132
133    /// Stop a route by ID (for use by hot-reload, where async_trait is required).
134    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
135}
136
137/// Internal state for a managed route.
138struct AggregateSplitInfo {
139    pre_pipeline: SharedPipeline,
140    agg_config: AggregatorConfig,
141    post_pipeline: SharedPipeline,
142}
143
144struct ManagedRoute {
145    /// The route definition metadata (for introspection).
146    definition: RouteDefinitionInfo,
147    /// Source endpoint URI.
148    from_uri: String,
149    /// Resolved processor pipeline (wrapped for atomic swap).
150    pipeline: SharedPipeline,
151    /// Concurrency model override (if any).
152    concurrency: Option<ConcurrencyModel>,
153    /// Handle for the consumer task (if running).
154    consumer_handle: Option<JoinHandle<()>>,
155    /// Handle for the pipeline task (if running).
156    pipeline_handle: Option<JoinHandle<()>>,
157    /// Cancellation token for stopping the consumer task.
158    /// This allows independent control of the consumer lifecycle (for suspend/resume).
159    consumer_cancel_token: CancellationToken,
160    /// Cancellation token for stopping the pipeline task.
161    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
162    pipeline_cancel_token: CancellationToken,
163    /// Channel sender for sending exchanges to the pipeline.
164    /// Stored to allow resuming a suspended route without recreating the channel.
165    channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
166    /// In-flight exchange counter. `None` when UoW is not configured for this route.
167    in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
168    aggregate_split: Option<AggregateSplitInfo>,
169    agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
170}
171
172fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
173    handle.as_ref().is_some_and(|h| !h.is_finished())
174}
175
176fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
177    match (
178        handle_is_running(&managed.consumer_handle),
179        handle_is_running(&managed.pipeline_handle),
180    ) {
181        (true, true) => "Started",
182        (false, true) => "Suspended",
183        (true, false) => "Stopping",
184        (false, false) => "Stopped",
185    }
186}
187
188fn find_top_level_aggregate_with_timeout(
189    steps: &[BuilderStep],
190) -> Option<(usize, AggregatorConfig)> {
191    for (i, step) in steps.iter().enumerate() {
192        if let BuilderStep::Aggregate { config } = step {
193            if has_timeout_condition(&config.completion) {
194                return Some((i, config.clone()));
195            }
196            break;
197        }
198    }
199    None
200}
201
202fn is_pending(ex: &Exchange) -> bool {
203    ex.property("CamelAggregatorPending")
204        .and_then(|v| v.as_bool())
205        .unwrap_or(false)
206}
207
208/// Wait for a pipeline service to be ready with circuit breaker backoff.
209///
210/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
211/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
212/// cancellation checks. It returns `Ok(())` when the service is ready, or
213/// `Err(e)` if cancellation occurred or a fatal error was encountered.
214async fn ready_with_backoff(
215    pipeline: &mut BoxProcessor,
216    cancel: &CancellationToken,
217) -> Result<(), CamelError> {
218    loop {
219        match pipeline.ready().await {
220            Ok(_) => return Ok(()),
221            Err(CamelError::CircuitOpen(ref msg)) => {
222                warn!("Circuit open, backing off: {msg}");
223                tokio::select! {
224                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
225                        continue;
226                    }
227                    _ = cancel.cancelled() => {
228                        // Shutting down — don't retry.
229                        return Err(CamelError::CircuitOpen(msg.clone()));
230                    }
231                }
232            }
233            Err(e) => {
234                error!("Pipeline not ready: {e}");
235                return Err(e);
236            }
237        }
238    }
239}
240
241fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
242    let stamp = std::time::SystemTime::now()
243        .duration_since(std::time::UNIX_EPOCH)
244        .unwrap_or_default()
245        .as_nanos();
246    RuntimeCommand::FailRoute {
247        route_id: route_id.to_string(),
248        error: error.to_string(),
249        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
250        causation_id: None,
251    }
252}
253
254async fn publish_runtime_failure(
255    runtime: Option<Weak<dyn RuntimeHandle>>,
256    route_id: &str,
257    error: &str,
258) {
259    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
260        return;
261    };
262    let command = runtime_failure_command(route_id, error);
263    if let Err(runtime_error) = runtime.execute(command).await {
264        warn!(
265            route_id = %route_id,
266            error = %runtime_error,
267            "failed to synchronize route crash with runtime projection"
268        );
269    }
270}
271
272/// Default implementation of [`RouteController`].
273///
274/// Manages route lifecycle with support for:
275/// - Starting/stopping individual routes
276/// - Suspending and resuming routes
277/// - Auto-startup with startup ordering
278/// - Graceful shutdown
279pub struct DefaultRouteController {
280    /// Routes indexed by route ID.
281    routes: HashMap<String, ManagedRoute>,
282    /// Reference to the component registry for resolving endpoints.
283    registry: Arc<std::sync::Mutex<Registry>>,
284    /// Shared language registry for resolving declarative language expressions.
285    languages: SharedLanguageRegistry,
286    /// Bean registry for bean method invocation.
287    beans: Arc<std::sync::Mutex<BeanRegistry>>,
288    /// Self-reference for creating ProducerContext.
289    /// Set after construction via `set_self_ref()`.
290    self_ref: Option<Arc<Mutex<dyn RouteController>>>,
291    /// Runtime handle injected into ProducerContext for command/query operations.
292    runtime: Option<Weak<dyn RuntimeHandle>>,
293    /// Optional global error handler applied to all routes without a per-route handler.
294    global_error_handler: Option<ErrorHandlerConfig>,
295    /// Optional crash notifier for supervision.
296    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
297    /// Whether tracing is enabled for route pipelines.
298    tracing_enabled: bool,
299    /// Detail level for tracing when enabled.
300    tracer_detail_level: DetailLevel,
301    /// Metrics collector for tracing processor.
302    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
303}
304
305impl DefaultRouteController {
306    /// Create a new `DefaultRouteController` with the given registry.
307    pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
308        Self::with_beans(
309            registry,
310            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
311        )
312    }
313
314    /// Create a new `DefaultRouteController` with shared bean registry.
315    pub fn with_beans(
316        registry: Arc<std::sync::Mutex<Registry>>,
317        beans: Arc<std::sync::Mutex<BeanRegistry>>,
318    ) -> Self {
319        Self {
320            routes: HashMap::new(),
321            registry,
322            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
323            beans,
324            self_ref: None,
325            runtime: None,
326            global_error_handler: None,
327            crash_notifier: None,
328            tracing_enabled: false,
329            tracer_detail_level: DetailLevel::Minimal,
330            tracer_metrics: None,
331        }
332    }
333
334    /// Create a new `DefaultRouteController` with shared language registry.
335    pub fn with_languages(
336        registry: Arc<std::sync::Mutex<Registry>>,
337        languages: SharedLanguageRegistry,
338    ) -> Self {
339        Self {
340            routes: HashMap::new(),
341            registry,
342            languages,
343            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
344            self_ref: None,
345            runtime: None,
346            global_error_handler: None,
347            crash_notifier: None,
348            tracing_enabled: false,
349            tracer_detail_level: DetailLevel::Minimal,
350            tracer_metrics: None,
351        }
352    }
353
354    /// Set the self-reference for creating ProducerContext.
355    ///
356    /// This must be called after wrapping the controller in `Arc<Mutex<>>`.
357    pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
358        self.self_ref = Some(self_ref);
359    }
360
361    /// Set runtime handle for ProducerContext creation.
362    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
363        self.runtime = Some(Arc::downgrade(&runtime));
364    }
365
366    /// Get the self-reference, if set.
367    ///
368    /// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
369    /// to spawn the supervision loop.
370    pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
371        self.self_ref.clone()
372    }
373
374    /// Get runtime handle for supervision-triggered lifecycle commands, if set.
375    pub fn runtime_handle_for_supervision(&self) -> Option<Arc<dyn RuntimeHandle>> {
376        self.runtime.as_ref().and_then(Weak::upgrade)
377    }
378
379    /// Set the crash notifier for supervision.
380    ///
381    /// When set, the controller will send a [`CrashNotification`] whenever
382    /// a consumer crashes.
383    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
384        self.crash_notifier = Some(tx);
385    }
386
387    /// Set a global error handler applied to all routes without a per-route handler.
388    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
389        self.global_error_handler = Some(config);
390    }
391
392    /// Configure tracing for this route controller.
393    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
394        self.tracing_enabled = config.enabled;
395        self.tracer_detail_level = config.detail_level.clone();
396        self.tracer_metrics = config.metrics_collector.clone();
397    }
398
399    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
400        let mut producer_ctx = ProducerContext::new();
401        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
402            producer_ctx = producer_ctx.with_runtime(runtime);
403        }
404        Ok(producer_ctx)
405    }
406
407    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
408    fn resolve_error_handler(
409        &self,
410        config: ErrorHandlerConfig,
411        producer_ctx: &ProducerContext,
412        registry: &Registry,
413    ) -> Result<ErrorHandlerLayer, CamelError> {
414        // Resolve DLC URI → producer.
415        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
416            let parsed = parse_uri(uri)?;
417            let component = registry.get_or_err(&parsed.scheme)?;
418            let endpoint = component.create_endpoint(uri)?;
419            Some(endpoint.create_producer(producer_ctx)?)
420        } else {
421            None
422        };
423
424        // Resolve per-policy `handled_by` URIs.
425        let mut resolved_policies = Vec::new();
426        for policy in config.policies {
427            let handler_producer = if let Some(ref uri) = policy.handled_by {
428                let parsed = parse_uri(uri)?;
429                let component = registry.get_or_err(&parsed.scheme)?;
430                let endpoint = component.create_endpoint(uri)?;
431                Some(endpoint.create_producer(producer_ctx)?)
432            } else {
433                None
434            };
435            resolved_policies.push((policy, handler_producer));
436        }
437
438        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
439    }
440
441    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
442    /// Returns `Err` if any hook URI cannot be resolved.
443    fn resolve_uow_layer(
444        &self,
445        config: &UnitOfWorkConfig,
446        producer_ctx: &ProducerContext,
447        registry: &Registry,
448        counter: Option<Arc<AtomicU64>>,
449    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
450        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
451            let parsed = parse_uri(uri)?;
452            let component = registry.get_or_err(&parsed.scheme)?;
453            let endpoint = component.create_endpoint(uri)?;
454            endpoint.create_producer(producer_ctx).map_err(|e| {
455                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
456            })
457        };
458
459        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
460        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
461
462        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
463        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
464        Ok((layer, counter))
465    }
466
467    fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
468        let guard = self
469            .languages
470            .lock()
471            .expect("mutex poisoned: another thread panicked while holding this lock");
472        guard.get(language).cloned().ok_or_else(|| {
473            CamelError::RouteError(format!(
474                "language `{language}` is not registered in CamelContext"
475            ))
476        })
477    }
478
479    fn compile_language_expression(
480        &self,
481        expression: &LanguageExpressionDef,
482    ) -> Result<Arc<dyn Expression>, CamelError> {
483        let language = self.resolve_language(&expression.language)?;
484        let compiled = language
485            .create_expression(&expression.source)
486            .map_err(|e| {
487                CamelError::RouteError(format!(
488                    "failed to compile {} expression `{}`: {e}",
489                    expression.language, expression.source
490                ))
491            })?;
492        Ok(Arc::from(compiled))
493    }
494
495    fn compile_language_predicate(
496        &self,
497        expression: &LanguageExpressionDef,
498    ) -> Result<Arc<dyn Predicate>, CamelError> {
499        let language = self.resolve_language(&expression.language)?;
500        let compiled = language.create_predicate(&expression.source).map_err(|e| {
501            CamelError::RouteError(format!(
502                "failed to compile {} predicate `{}`: {e}",
503                expression.language, expression.source
504            ))
505        })?;
506        Ok(Arc::from(compiled))
507    }
508
509    fn compile_filter_predicate(
510        &self,
511        expression: &LanguageExpressionDef,
512    ) -> Result<FilterPredicate, CamelError> {
513        let predicate = self.compile_language_predicate(expression)?;
514        Ok(Arc::new(move |exchange: &Exchange| {
515            predicate.matches(exchange).unwrap_or(false)
516        }))
517    }
518
519    fn value_to_body(value: Value) -> Body {
520        match value {
521            Value::Null => Body::Empty,
522            Value::String(text) => Body::Text(text),
523            other => Body::Json(other),
524        }
525    }
526
527    /// Resolve BuilderSteps into BoxProcessors.
528    pub(crate) fn resolve_steps(
529        &self,
530        steps: Vec<BuilderStep>,
531        producer_ctx: &ProducerContext,
532        registry: &Arc<std::sync::Mutex<Registry>>,
533    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
534        let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
535            let parsed = parse_uri(uri)?;
536            let registry_guard = registry
537                .lock()
538                .expect("mutex poisoned: another thread panicked while holding this lock");
539            let component = registry_guard.get_or_err(&parsed.scheme)?;
540            let endpoint = component.create_endpoint(uri)?;
541            endpoint.create_producer(producer_ctx)
542        };
543
544        let mut processors: Vec<(BoxProcessor, Option<camel_api::BodyType>)> = Vec::new();
545        for step in steps {
546            match step {
547                BuilderStep::Processor(svc) => {
548                    processors.push((svc, None));
549                }
550                BuilderStep::To(uri) => {
551                    let parsed = parse_uri(&uri)?;
552                    let registry_guard = registry
553                        .lock()
554                        .expect("mutex poisoned: another thread panicked while holding this lock");
555                    let component = registry_guard.get_or_err(&parsed.scheme)?;
556                    let endpoint = component.create_endpoint(&uri)?;
557                    let contract = endpoint.body_contract();
558                    let producer = endpoint.create_producer(producer_ctx)?;
559                    processors.push((producer, contract));
560                }
561                BuilderStep::Stop => {
562                    processors.push((BoxProcessor::new(camel_processor::StopService), None));
563                }
564                BuilderStep::Delay { config } => {
565                    let svc = camel_processor::delayer::DelayerService::new(config);
566                    processors.push((BoxProcessor::new(svc), None));
567                }
568                BuilderStep::Log { level, message } => {
569                    let svc = camel_processor::LogProcessor::new(level, message);
570                    processors.push((BoxProcessor::new(svc), None));
571                }
572                BuilderStep::DeclarativeSetHeader { key, value } => match value {
573                    ValueSourceDef::Literal(value) => {
574                        let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
575                        processors.push((BoxProcessor::new(svc), None));
576                    }
577                    ValueSourceDef::Expression(expression) => {
578                        let expression = self.compile_language_expression(&expression)?;
579                        let svc = camel_processor::DynamicSetHeader::new(
580                            IdentityProcessor,
581                            key,
582                            move |exchange: &Exchange| {
583                                expression.evaluate(exchange).unwrap_or(Value::Null)
584                            },
585                        );
586                        processors.push((BoxProcessor::new(svc), None));
587                    }
588                },
589                BuilderStep::DeclarativeSetBody { value } => match value {
590                    ValueSourceDef::Literal(value) => {
591                        let body = Self::value_to_body(value);
592                        let svc = camel_processor::SetBody::new(
593                            IdentityProcessor,
594                            move |_exchange: &Exchange| body.clone(),
595                        );
596                        processors.push((BoxProcessor::new(svc), None));
597                    }
598                    ValueSourceDef::Expression(expression) => {
599                        let expression = self.compile_language_expression(&expression)?;
600                        let svc = camel_processor::SetBody::new(
601                            IdentityProcessor,
602                            move |exchange: &Exchange| {
603                                let value = expression.evaluate(exchange).unwrap_or(Value::Null);
604                                Self::value_to_body(value)
605                            },
606                        );
607                        processors.push((BoxProcessor::new(svc), None));
608                    }
609                },
610                BuilderStep::DeclarativeFilter { predicate, steps } => {
611                    let predicate = self.compile_filter_predicate(&predicate)?;
612                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
613                    let sub_processors: Vec<BoxProcessor> =
614                        sub_pairs.into_iter().map(|(p, _)| p).collect();
615                    let sub_pipeline = compose_pipeline(sub_processors);
616                    let svc =
617                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
618                    processors.push((BoxProcessor::new(svc), None));
619                }
620                BuilderStep::DeclarativeChoice { whens, otherwise } => {
621                    let mut when_clauses = Vec::new();
622                    for when_step in whens {
623                        let predicate = self.compile_filter_predicate(&when_step.predicate)?;
624                        let sub_pairs =
625                            self.resolve_steps(when_step.steps, producer_ctx, registry)?;
626                        let sub_processors: Vec<BoxProcessor> =
627                            sub_pairs.into_iter().map(|(p, _)| p).collect();
628                        let pipeline = compose_pipeline(sub_processors);
629                        when_clauses.push(WhenClause {
630                            predicate,
631                            pipeline,
632                        });
633                    }
634                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
635                        let sub_pairs =
636                            self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
637                        let sub_processors: Vec<BoxProcessor> =
638                            sub_pairs.into_iter().map(|(p, _)| p).collect();
639                        Some(compose_pipeline(sub_processors))
640                    } else {
641                        None
642                    };
643                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
644                    processors.push((BoxProcessor::new(svc), None));
645                }
646                BuilderStep::DeclarativeScript { expression } => {
647                    let lang = self.resolve_language(&expression.language)?;
648                    match lang.create_mutating_expression(&expression.source) {
649                        Ok(mut_expr) => {
650                            processors
651                                .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
652                        }
653                        Err(LanguageError::NotSupported { .. }) => {
654                            // Graceful degradation: YAML declarative routes fall back to read-only
655                            // Expression → SetBody when the language doesn't support MutatingExpression.
656                            // This preserves backwards compatibility for languages like Simple that
657                            // only implement Expression. Contrast with the explicit .script() DSL step
658                            // which hard-errors on NotSupported (user opted in to mutation semantics).
659                            // TODO: add integration test asserting Simple language falls back to
660                            // read-only path (requires full CamelContext test harness).
661                            let expression = self.compile_language_expression(&expression)?;
662                            let svc = camel_processor::SetBody::new(
663                                IdentityProcessor,
664                                move |exchange: &Exchange| {
665                                    let value =
666                                        expression.evaluate(exchange).unwrap_or(Value::Null);
667                                    Self::value_to_body(value)
668                                },
669                            );
670                            processors.push((BoxProcessor::new(svc), None));
671                        }
672                        Err(e) => {
673                            return Err(CamelError::RouteError(format!(
674                                "Failed to create mutating expression for language '{}': {}",
675                                expression.language, e
676                            )));
677                        }
678                    }
679                }
680                BuilderStep::Split { config, steps } => {
681                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
682                    let sub_processors: Vec<BoxProcessor> =
683                        sub_pairs.into_iter().map(|(p, _)| p).collect();
684                    let sub_pipeline = compose_pipeline(sub_processors);
685                    let splitter =
686                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
687                    processors.push((BoxProcessor::new(splitter), None));
688                }
689                BuilderStep::DeclarativeSplit {
690                    expression,
691                    aggregation,
692                    parallel,
693                    parallel_limit,
694                    stop_on_exception,
695                    steps,
696                } => {
697                    let lang_expr = self.compile_language_expression(&expression)?;
698                    let split_fn = move |exchange: &Exchange| {
699                        let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
700                        match value {
701                            Value::String(s) => s
702                                .lines()
703                                .filter(|line| !line.is_empty())
704                                .map(|line| {
705                                    let mut fragment = exchange.clone();
706                                    fragment.input.body = Body::from(line.to_string());
707                                    fragment
708                                })
709                                .collect(),
710                            Value::Array(arr) => arr
711                                .into_iter()
712                                .map(|v| {
713                                    let mut fragment = exchange.clone();
714                                    fragment.input.body = Body::from(v);
715                                    fragment
716                                })
717                                .collect(),
718                            _ => vec![exchange.clone()],
719                        }
720                    };
721
722                    let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
723                        .aggregation(aggregation)
724                        .parallel(parallel)
725                        .stop_on_exception(stop_on_exception);
726                    if let Some(limit) = parallel_limit {
727                        config = config.parallel_limit(limit);
728                    }
729
730                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
731                    let sub_processors: Vec<BoxProcessor> =
732                        sub_pairs.into_iter().map(|(p, _)| p).collect();
733                    let sub_pipeline = compose_pipeline(sub_processors);
734                    let splitter =
735                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
736                    processors.push((BoxProcessor::new(splitter), None));
737                }
738                BuilderStep::Aggregate { config } => {
739                    let (late_tx, _late_rx) = mpsc::channel(256);
740                    let registry: SharedLanguageRegistry =
741                        Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
742                    let cancel = CancellationToken::new();
743                    let svc =
744                        camel_processor::AggregatorService::new(config, late_tx, registry, cancel);
745                    processors.push((BoxProcessor::new(svc), None));
746                }
747                BuilderStep::Filter { predicate, steps } => {
748                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
749                    let sub_processors: Vec<BoxProcessor> =
750                        sub_pairs.into_iter().map(|(p, _)| p).collect();
751                    let sub_pipeline = compose_pipeline(sub_processors);
752                    let svc =
753                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
754                    processors.push((BoxProcessor::new(svc), None));
755                }
756                BuilderStep::Choice { whens, otherwise } => {
757                    // Resolve each when clause's sub-steps into a pipeline.
758                    let mut when_clauses = Vec::new();
759                    for when_step in whens {
760                        let sub_pairs =
761                            self.resolve_steps(when_step.steps, producer_ctx, registry)?;
762                        let sub_processors: Vec<BoxProcessor> =
763                            sub_pairs.into_iter().map(|(p, _)| p).collect();
764                        let pipeline = compose_pipeline(sub_processors);
765                        when_clauses.push(WhenClause {
766                            predicate: when_step.predicate,
767                            pipeline,
768                        });
769                    }
770                    // Resolve otherwise branch (if present).
771                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
772                        let sub_pairs =
773                            self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
774                        let sub_processors: Vec<BoxProcessor> =
775                            sub_pairs.into_iter().map(|(p, _)| p).collect();
776                        Some(compose_pipeline(sub_processors))
777                    } else {
778                        None
779                    };
780                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
781                    processors.push((BoxProcessor::new(svc), None));
782                }
783                BuilderStep::WireTap { uri } => {
784                    let producer = resolve_producer(&uri)?;
785                    let svc = camel_processor::WireTapService::new(producer);
786                    processors.push((BoxProcessor::new(svc), None));
787                }
788                BuilderStep::Multicast { config, steps } => {
789                    // Each top-level step in the multicast scope becomes an independent endpoint.
790                    let mut endpoints = Vec::new();
791                    for step in steps {
792                        let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
793                        let sub_processors: Vec<BoxProcessor> =
794                            sub_pairs.into_iter().map(|(p, _)| p).collect();
795                        let endpoint = compose_pipeline(sub_processors);
796                        endpoints.push(endpoint);
797                    }
798                    let svc = camel_processor::MulticastService::new(endpoints, config);
799                    processors.push((BoxProcessor::new(svc), None));
800                }
801                BuilderStep::DeclarativeLog { level, message } => {
802                    let ValueSourceDef::Expression(expression) = message else {
803                        // Literal case is already converted to a Processor in compile.rs;
804                        // this arm should never be reached for literals.
805                        unreachable!(
806                            "DeclarativeLog with Literal should have been compiled to a Processor"
807                        );
808                    };
809                    let expression = self.compile_language_expression(&expression)?;
810                    let svc =
811                        camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
812                            expression
813                                .evaluate(exchange)
814                                .unwrap_or_else(|e| {
815                                    warn!(error = %e, "log expression evaluation failed");
816                                    Value::Null
817                                })
818                                .to_string()
819                        });
820                    processors.push((BoxProcessor::new(svc), None));
821                }
822                BuilderStep::Bean { name, method } => {
823                    // Lock beans registry to lookup bean
824                    let beans = self.beans.lock().expect(
825                        "beans mutex poisoned: another thread panicked while holding this lock",
826                    );
827
828                    // Lookup bean by name
829                    let bean = beans.get(&name).ok_or_else(|| {
830                        CamelError::ProcessorError(format!("Bean not found: {}", name))
831                    })?;
832
833                    // Clone Arc for async closure (release lock before async)
834                    let bean_clone = Arc::clone(&bean);
835                    let method = method.clone();
836
837                    // Create processor that invokes bean method
838                    let processor = tower::service_fn(move |mut exchange: Exchange| {
839                        let bean = Arc::clone(&bean_clone);
840                        let method = method.clone();
841
842                        async move {
843                            bean.call(&method, &mut exchange).await?;
844                            Ok(exchange)
845                        }
846                    });
847
848                    processors.push((BoxProcessor::new(processor), None));
849                }
850                BuilderStep::Script { language, script } => {
851                    let lang = self.resolve_language(&language)?;
852                    match lang.create_mutating_expression(&script) {
853                        Ok(mut_expr) => {
854                            processors
855                                .push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
856                        }
857                        Err(LanguageError::NotSupported {
858                            feature,
859                            language: ref lang_name,
860                        }) => {
861                            // Hard error: the .script() DSL step explicitly requests mutation semantics.
862                            // If the language doesn't support MutatingExpression, the route is mis-configured.
863                            return Err(CamelError::RouteError(format!(
864                                "Language '{}' does not support {} (required for .script() step)",
865                                lang_name, feature
866                            )));
867                        }
868                        Err(e) => {
869                            return Err(CamelError::RouteError(format!(
870                                "Failed to create mutating expression for language '{}': {}",
871                                language, e
872                            )));
873                        }
874                    }
875                }
876                BuilderStep::Throttle { config, steps } => {
877                    let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
878                    let sub_processors: Vec<BoxProcessor> =
879                        sub_pairs.into_iter().map(|(p, _)| p).collect();
880                    let sub_pipeline = compose_pipeline(sub_processors);
881                    let svc =
882                        camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
883                    processors.push((BoxProcessor::new(svc), None));
884                }
885                BuilderStep::LoadBalance { config, steps } => {
886                    // Each top-level step in the load_balance scope becomes an independent endpoint.
887                    let mut endpoints = Vec::new();
888                    for step in steps {
889                        let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
890                        let sub_processors: Vec<BoxProcessor> =
891                            sub_pairs.into_iter().map(|(p, _)| p).collect();
892                        let endpoint = compose_pipeline(sub_processors);
893                        endpoints.push(endpoint);
894                    }
895                    let svc =
896                        camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
897                    processors.push((BoxProcessor::new(svc), None));
898                }
899                BuilderStep::DynamicRouter { config } => {
900                    use camel_api::EndpointResolver;
901
902                    let producer_ctx_clone = producer_ctx.clone();
903                    let registry_clone = Arc::clone(registry);
904                    let resolver: EndpointResolver = Arc::new(move |uri: &str| {
905                        let parsed = match parse_uri(uri) {
906                            Ok(p) => p,
907                            Err(_) => return None,
908                        };
909                        let registry_guard = match registry_clone.lock() {
910                            Ok(g) => g,
911                            Err(_) => return None, // mutex poisoned
912                        };
913                        let component = match registry_guard.get_or_err(&parsed.scheme) {
914                            Ok(c) => c,
915                            Err(_) => return None,
916                        };
917                        let endpoint = match component.create_endpoint(uri) {
918                            Ok(e) => e,
919                            Err(_) => return None,
920                        };
921                        let producer = match endpoint.create_producer(&producer_ctx_clone) {
922                            Ok(p) => p,
923                            Err(_) => return None,
924                        };
925                        Some(BoxProcessor::new(producer))
926                    });
927                    let svc = camel_processor::dynamic_router::DynamicRouterService::new(
928                        config, resolver,
929                    );
930                    processors.push((BoxProcessor::new(svc), None));
931                }
932                BuilderStep::DeclarativeDynamicRouter {
933                    expression,
934                    uri_delimiter,
935                    cache_size,
936                    ignore_invalid_endpoints,
937                    max_iterations,
938                } => {
939                    use camel_api::EndpointResolver;
940
941                    let expression = self.compile_language_expression(&expression)?;
942                    let expression: camel_api::RouterExpression =
943                        Arc::new(move |exchange: &Exchange| {
944                            let value = expression.evaluate(exchange).unwrap_or(Value::Null);
945                            match value {
946                                Value::Null => None,
947                                Value::String(s) => Some(s),
948                                other => Some(other.to_string()),
949                            }
950                        });
951
952                    // Note: timeout defaults to 60s (DynamicRouterConfig::new default).
953                    // Apache Camel does not expose a timeout option on dynamicRouter —
954                    // our timeout is a rust-camel extension.
955                    let config = camel_api::DynamicRouterConfig::new(expression)
956                        .uri_delimiter(uri_delimiter)
957                        .cache_size(cache_size)
958                        .ignore_invalid_endpoints(ignore_invalid_endpoints)
959                        .max_iterations(max_iterations);
960
961                    let producer_ctx_clone = producer_ctx.clone();
962                    let registry_clone = Arc::clone(registry);
963                    let resolver: EndpointResolver = Arc::new(move |uri: &str| {
964                        let parsed = match parse_uri(uri) {
965                            Ok(p) => p,
966                            Err(_) => return None,
967                        };
968                        let registry_guard = match registry_clone.lock() {
969                            Ok(g) => g,
970                            Err(_) => return None,
971                        };
972                        let component = match registry_guard.get_or_err(&parsed.scheme) {
973                            Ok(c) => c,
974                            Err(_) => return None,
975                        };
976                        let endpoint = match component.create_endpoint(uri) {
977                            Ok(e) => e,
978                            Err(_) => return None,
979                        };
980                        let producer = match endpoint.create_producer(&producer_ctx_clone) {
981                            Ok(p) => p,
982                            Err(_) => return None,
983                        };
984                        Some(BoxProcessor::new(producer))
985                    });
986                    let svc = camel_processor::dynamic_router::DynamicRouterService::new(
987                        config, resolver,
988                    );
989                    processors.push((BoxProcessor::new(svc), None));
990                }
991                BuilderStep::RoutingSlip { config } => {
992                    use camel_api::EndpointResolver;
993
994                    let producer_ctx_clone = producer_ctx.clone();
995                    let registry_clone = Arc::clone(registry);
996                    let resolver: EndpointResolver = Arc::new(move |uri: &str| {
997                        let parsed = match parse_uri(uri) {
998                            Ok(p) => p,
999                            Err(_) => return None,
1000                        };
1001                        let registry_guard = match registry_clone.lock() {
1002                            Ok(g) => g,
1003                            Err(_) => return None,
1004                        };
1005                        let component = match registry_guard.get_or_err(&parsed.scheme) {
1006                            Ok(c) => c,
1007                            Err(_) => return None,
1008                        };
1009                        let endpoint = match component.create_endpoint(uri) {
1010                            Ok(e) => e,
1011                            Err(_) => return None,
1012                        };
1013                        let producer = match endpoint.create_producer(&producer_ctx_clone) {
1014                            Ok(p) => p,
1015                            Err(_) => return None,
1016                        };
1017                        Some(BoxProcessor::new(producer))
1018                    });
1019
1020                    let svc =
1021                        camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
1022                    processors.push((BoxProcessor::new(svc), None));
1023                }
1024                BuilderStep::DeclarativeRoutingSlip {
1025                    expression,
1026                    uri_delimiter,
1027                    cache_size,
1028                    ignore_invalid_endpoints,
1029                } => {
1030                    use camel_api::EndpointResolver;
1031
1032                    let expression = self.compile_language_expression(&expression)?;
1033                    let expression: camel_api::RoutingSlipExpression =
1034                        Arc::new(move |exchange: &Exchange| {
1035                            let value = expression.evaluate(exchange).unwrap_or(Value::Null);
1036                            match value {
1037                                Value::Null => None,
1038                                Value::String(s) => Some(s),
1039                                other => Some(other.to_string()),
1040                            }
1041                        });
1042
1043                    let config = camel_api::RoutingSlipConfig::new(expression)
1044                        .uri_delimiter(uri_delimiter)
1045                        .cache_size(cache_size)
1046                        .ignore_invalid_endpoints(ignore_invalid_endpoints);
1047
1048                    let producer_ctx_clone = producer_ctx.clone();
1049                    let registry_clone = Arc::clone(registry);
1050                    let resolver: EndpointResolver = Arc::new(move |uri: &str| {
1051                        let parsed = match parse_uri(uri) {
1052                            Ok(p) => p,
1053                            Err(_) => return None,
1054                        };
1055                        let registry_guard = match registry_clone.lock() {
1056                            Ok(g) => g,
1057                            Err(_) => return None,
1058                        };
1059                        let component = match registry_guard.get_or_err(&parsed.scheme) {
1060                            Ok(c) => c,
1061                            Err(_) => return None,
1062                        };
1063                        let endpoint = match component.create_endpoint(uri) {
1064                            Ok(e) => e,
1065                            Err(_) => return None,
1066                        };
1067                        let producer = match endpoint.create_producer(&producer_ctx_clone) {
1068                            Ok(p) => p,
1069                            Err(_) => return None,
1070                        };
1071                        Some(BoxProcessor::new(producer))
1072                    });
1073
1074                    let svc =
1075                        camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
1076                    processors.push((BoxProcessor::new(svc), None));
1077                }
1078            }
1079        }
1080        Ok(processors)
1081    }
1082
1083    /// Add a route definition to the controller.
1084    ///
1085    /// Steps are resolved immediately using the registry.
1086    ///
1087    /// # Errors
1088    ///
1089    /// Returns an error if:
1090    /// - A route with the same ID already exists
1091    /// - Step resolution fails
1092    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
1093        let route_id = definition.route_id().to_string();
1094
1095        if self.routes.contains_key(&route_id) {
1096            return Err(CamelError::RouteError(format!(
1097                "Route '{}' already exists",
1098                route_id
1099            )));
1100        }
1101
1102        info!(route_id = %route_id, "Adding route to controller");
1103
1104        // Extract definition info for storage before steps are consumed
1105        let definition_info = definition.to_info();
1106        let RouteDefinition {
1107            from_uri,
1108            steps,
1109            error_handler,
1110            circuit_breaker,
1111            unit_of_work,
1112            concurrency,
1113            ..
1114        } = definition;
1115
1116        // Create ProducerContext from self_ref for step resolution
1117        let producer_ctx = self.build_producer_context()?;
1118
1119        // Take ownership of steps before resolve_steps consumes them
1120        let mut aggregate_split: Option<AggregateSplitInfo> = None;
1121        let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
1122            Some((idx, agg_config)) => {
1123                let mut pre_steps = steps;
1124                let mut rest = pre_steps.split_off(idx);
1125                let _agg_step = rest.remove(0);
1126                let post_steps = rest;
1127
1128                let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
1129                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
1130                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
1131                    compose_pipeline(pre_procs),
1132                )));
1133
1134                let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
1135                let post_procs: Vec<BoxProcessor> =
1136                    post_pairs.into_iter().map(|(p, _)| p).collect();
1137                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
1138                    compose_pipeline(post_procs),
1139                )));
1140
1141                aggregate_split = Some(AggregateSplitInfo {
1142                    pre_pipeline,
1143                    agg_config,
1144                    post_pipeline,
1145                });
1146
1147                vec![]
1148            }
1149            None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
1150        };
1151        let route_id_for_tracing = route_id.clone();
1152        let mut pipeline = if processors_with_contracts.is_empty() {
1153            BoxProcessor::new(IdentityProcessor)
1154        } else {
1155            compose_traced_pipeline_with_contracts(
1156                processors_with_contracts,
1157                &route_id_for_tracing,
1158                self.tracing_enabled,
1159                self.tracer_detail_level.clone(),
1160                self.tracer_metrics.clone(),
1161            )
1162        };
1163
1164        // Apply circuit breaker if configured
1165        if let Some(cb_config) = circuit_breaker {
1166            let cb_layer = CircuitBreakerLayer::new(cb_config);
1167            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1168        }
1169
1170        // Determine which error handler config to use (per-route takes precedence)
1171        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
1172
1173        if let Some(config) = eh_config {
1174            // Lock registry for error handler resolution
1175            let registry = self
1176                .registry
1177                .lock()
1178                .expect("mutex poisoned: another thread panicked while holding this lock");
1179            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
1180            pipeline = BoxProcessor::new(layer.layer(pipeline));
1181        }
1182
1183        // Apply UoW layer outermost (after error handler)
1184        let uow_counter = if let Some(uow_config) = &unit_of_work {
1185            let registry = self
1186                .registry
1187                .lock()
1188                .expect("mutex poisoned: registry lock in add_route uow");
1189            let (uow_layer, counter) =
1190                self.resolve_uow_layer(uow_config, &producer_ctx, &registry, None)?;
1191            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1192            Some(counter)
1193        } else {
1194            None
1195        };
1196
1197        self.routes.insert(
1198            route_id.clone(),
1199            ManagedRoute {
1200                definition: definition_info,
1201                from_uri,
1202                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
1203                concurrency,
1204                consumer_handle: None,
1205                pipeline_handle: None,
1206                consumer_cancel_token: CancellationToken::new(),
1207                pipeline_cancel_token: CancellationToken::new(),
1208                channel_sender: None,
1209                in_flight: uow_counter,
1210                aggregate_split,
1211                agg_service: None,
1212            },
1213        );
1214
1215        Ok(())
1216    }
1217
1218    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
1219    ///
1220    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
1221    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
1222    pub fn compile_route_definition(
1223        &self,
1224        def: RouteDefinition,
1225    ) -> Result<BoxProcessor, CamelError> {
1226        let route_id = def.route_id().to_string();
1227
1228        let producer_ctx = self.build_producer_context()?;
1229
1230        let processors_with_contracts =
1231            self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
1232        let mut pipeline = compose_traced_pipeline_with_contracts(
1233            processors_with_contracts,
1234            &route_id,
1235            self.tracing_enabled,
1236            self.tracer_detail_level.clone(),
1237            self.tracer_metrics.clone(),
1238        );
1239
1240        if let Some(cb_config) = def.circuit_breaker {
1241            let cb_layer = CircuitBreakerLayer::new(cb_config);
1242            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
1243        }
1244
1245        let eh_config = def
1246            .error_handler
1247            .clone()
1248            .or_else(|| self.global_error_handler.clone());
1249        if let Some(config) = eh_config {
1250            // Lock registry for error handler resolution
1251            let registry = self
1252                .registry
1253                .lock()
1254                .expect("mutex poisoned: registry lock in compile_route_definition");
1255            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
1256            pipeline = BoxProcessor::new(layer.layer(pipeline));
1257        }
1258
1259        // Apply UoW layer outermost
1260        if let Some(uow_config) = &def.unit_of_work {
1261            let existing_counter = self
1262                .routes
1263                .get(&route_id)
1264                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
1265
1266            let registry = self
1267                .registry
1268                .lock()
1269                .expect("mutex poisoned: registry lock in compile_route_definition uow");
1270
1271            let (uow_layer, _counter) =
1272                self.resolve_uow_layer(uow_config, &producer_ctx, &registry, existing_counter)?;
1273
1274            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
1275        }
1276
1277        Ok(pipeline)
1278    }
1279
1280    /// Remove a route from the controller map.
1281    ///
1282    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
1283    /// Returns an error if the route is still running or does not exist.
1284    /// Does not cancel any running tasks — call `stop_route` first.
1285    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1286        let managed = self.routes.get(route_id).ok_or_else(|| {
1287            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
1288        })?;
1289        if handle_is_running(&managed.consumer_handle)
1290            || handle_is_running(&managed.pipeline_handle)
1291        {
1292            return Err(CamelError::RouteError(format!(
1293                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
1294                route_id,
1295                inferred_lifecycle_label(managed)
1296            )));
1297        }
1298        self.routes.remove(route_id);
1299        info!(route_id = %route_id, "Route removed from controller");
1300        Ok(())
1301    }
1302
1303    /// Returns the number of routes in the controller.
1304    pub fn route_count(&self) -> usize {
1305        self.routes.len()
1306    }
1307
1308    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
1309        self.routes.get(route_id).map(|r| {
1310            r.in_flight
1311                .as_ref()
1312                .map_or(0, |c| c.load(Ordering::Relaxed))
1313        })
1314    }
1315
1316    /// Returns `true` if a route with the given ID exists.
1317    pub fn route_exists(&self, route_id: &str) -> bool {
1318        self.routes.contains_key(route_id)
1319    }
1320
1321    /// Returns all route IDs.
1322    pub fn route_ids(&self) -> Vec<String> {
1323        self.routes.keys().cloned().collect()
1324    }
1325
1326    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
1327    pub fn auto_startup_route_ids(&self) -> Vec<String> {
1328        let mut pairs: Vec<(String, i32)> = self
1329            .routes
1330            .iter()
1331            .filter(|(_, managed)| managed.definition.auto_startup())
1332            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1333            .collect();
1334        pairs.sort_by_key(|(_, order)| *order);
1335        pairs.into_iter().map(|(id, _)| id).collect()
1336    }
1337
1338    /// Returns route IDs sorted by shutdown order (startup order descending).
1339    pub fn shutdown_route_ids(&self) -> Vec<String> {
1340        let mut pairs: Vec<(String, i32)> = self
1341            .routes
1342            .iter()
1343            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
1344            .collect();
1345        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1346        pairs.into_iter().map(|(id, _)| id).collect()
1347    }
1348
1349    /// Atomically swap the pipeline of a route.
1350    ///
1351    /// In-flight requests finish with the old pipeline (kept alive by Arc).
1352    /// New requests immediately use the new pipeline.
1353    pub fn swap_pipeline(
1354        &self,
1355        route_id: &str,
1356        new_pipeline: BoxProcessor,
1357    ) -> Result<(), CamelError> {
1358        let managed = self
1359            .routes
1360            .get(route_id)
1361            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1362
1363        if managed.aggregate_split.is_some() {
1364            tracing::warn!(
1365                route_id = %route_id,
1366                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
1367            );
1368        }
1369
1370        managed
1371            .pipeline
1372            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1373        info!(route_id = %route_id, "Pipeline swapped atomically");
1374        Ok(())
1375    }
1376
1377    /// Returns the from_uri of a route, if it exists.
1378    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1379        self.routes.get(route_id).map(|r| r.from_uri.clone())
1380    }
1381
1382    /// Get a clone of the current pipeline for a route.
1383    ///
1384    /// This is useful for testing and introspection.
1385    /// Returns `None` if the route doesn't exist.
1386    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1387        self.routes
1388            .get(route_id)
1389            .map(|r| r.pipeline.load().0.clone())
1390    }
1391
1392    /// Internal stop implementation that can set custom status.
1393    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1394        let managed = self
1395            .routes
1396            .get_mut(route_id)
1397            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1398
1399        if !handle_is_running(&managed.consumer_handle)
1400            && !handle_is_running(&managed.pipeline_handle)
1401        {
1402            return Ok(());
1403        }
1404
1405        info!(route_id = %route_id, "Stopping route");
1406
1407        // Cancel both tokens to signal shutdown for consumer and pipeline independently
1408        let managed = self
1409            .routes
1410            .get_mut(route_id)
1411            .expect("invariant: route must exist after prior existence check");
1412        managed.consumer_cancel_token.cancel();
1413
1414        // Aggregator v2: force-complete pending buckets before cancelling pipeline
1415        let managed = self
1416            .routes
1417            .get_mut(route_id)
1418            .expect("invariant: route must exist after prior existence check");
1419        if let Some(agg_svc) = &managed.agg_service {
1420            let guard = agg_svc.lock().unwrap();
1421            guard.force_complete_all();
1422        }
1423
1424        let managed = self
1425            .routes
1426            .get_mut(route_id)
1427            .expect("invariant: route must exist after prior existence check");
1428        managed.pipeline_cancel_token.cancel();
1429
1430        // Take handles directly (no Arc<Mutex> wrapper needed)
1431        let managed = self
1432            .routes
1433            .get_mut(route_id)
1434            .expect("invariant: route must exist after prior existence check");
1435        let consumer_handle = managed.consumer_handle.take();
1436        let pipeline_handle = managed.pipeline_handle.take();
1437
1438        // IMPORTANT: Drop channel_sender early so rx.recv() returns None
1439        // This ensures the pipeline task can exit even if idle on recv()
1440        let managed = self
1441            .routes
1442            .get_mut(route_id)
1443            .expect("invariant: route must exist after prior existence check");
1444        managed.channel_sender = None;
1445
1446        // Wait for tasks to complete with timeout
1447        // The CancellationToken already signaled tasks to stop gracefully.
1448        // Combined with the select! in pipeline loops, this should exit quickly.
1449        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1450            match (consumer_handle, pipeline_handle) {
1451                (Some(c), Some(p)) => {
1452                    let _ = tokio::join!(c, p);
1453                }
1454                (Some(c), None) => {
1455                    let _ = c.await;
1456                }
1457                (None, Some(p)) => {
1458                    let _ = p.await;
1459                }
1460                (None, None) => {}
1461            }
1462        })
1463        .await;
1464
1465        if timeout_result.is_err() {
1466            warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
1467        }
1468
1469        // Get the managed route again (can't hold across await)
1470        let managed = self
1471            .routes
1472            .get_mut(route_id)
1473            .expect("invariant: route must exist after prior existence check");
1474
1475        // Create fresh cancellation tokens for next start
1476        managed.consumer_cancel_token = CancellationToken::new();
1477        managed.pipeline_cancel_token = CancellationToken::new();
1478
1479        info!(route_id = %route_id, "Route stopped");
1480        Ok(())
1481    }
1482}
1483
1484#[async_trait::async_trait]
1485impl RouteController for DefaultRouteController {
1486    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1487        // Check if route exists and can be started.
1488        {
1489            let managed = self
1490                .routes
1491                .get_mut(route_id)
1492                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1493
1494            let consumer_running = handle_is_running(&managed.consumer_handle);
1495            let pipeline_running = handle_is_running(&managed.pipeline_handle);
1496            if consumer_running && pipeline_running {
1497                return Ok(());
1498            }
1499            if !consumer_running && pipeline_running {
1500                return Err(CamelError::RouteError(format!(
1501                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1502                    route_id
1503                )));
1504            }
1505            if consumer_running && !pipeline_running {
1506                return Err(CamelError::RouteError(format!(
1507                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1508                    route_id
1509                )));
1510            }
1511        }
1512
1513        info!(route_id = %route_id, "Starting route");
1514
1515        // Get the resolved route info
1516        let (from_uri, pipeline, concurrency) = {
1517            let managed = self
1518                .routes
1519                .get(route_id)
1520                .expect("invariant: route must exist after prior existence check");
1521            (
1522                managed.from_uri.clone(),
1523                Arc::clone(&managed.pipeline),
1524                managed.concurrency.clone(),
1525            )
1526        };
1527
1528        // Clone crash notifier for consumer task
1529        let crash_notifier = self.crash_notifier.clone();
1530        let runtime_for_consumer = self.runtime.clone();
1531
1532        // Parse from URI and create consumer (lock registry for lookup)
1533        let parsed = parse_uri(&from_uri)?;
1534        let registry = self
1535            .registry
1536            .lock()
1537            .expect("mutex poisoned: another thread panicked while holding this lock");
1538        let component = registry.get_or_err(&parsed.scheme)?;
1539        let endpoint = component.create_endpoint(&from_uri)?;
1540        let mut consumer = endpoint.create_consumer()?;
1541        let consumer_concurrency = consumer.concurrency_model();
1542        // Drop the lock before spawning tasks
1543        drop(registry);
1544
1545        // Resolve effective concurrency: route override > consumer default
1546        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1547
1548        // Get the managed route for mutation
1549        let managed = self
1550            .routes
1551            .get_mut(route_id)
1552            .expect("invariant: route must exist after prior existence check");
1553
1554        // Create channel for consumer to send exchanges
1555        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1556        // Create child tokens for independent lifecycle control
1557        let consumer_cancel = managed.consumer_cancel_token.child_token();
1558        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1559        // Clone sender for storage (to reuse on resume)
1560        let tx_for_storage = tx.clone();
1561        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1562
1563        // --- Aggregator v2: check for aggregate route with timeout ---
1564        let managed = self
1565            .routes
1566            .get_mut(route_id)
1567            .expect("invariant: route must exist after prior existence check");
1568
1569        if let Some(split) = managed.aggregate_split.as_ref() {
1570            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
1571
1572            let route_cancel_clone = pipeline_cancel.clone();
1573            let svc = AggregatorService::new(
1574                split.agg_config.clone(),
1575                late_tx,
1576                Arc::clone(&self.languages),
1577                route_cancel_clone,
1578            );
1579            let agg = Arc::new(std::sync::Mutex::new(svc));
1580
1581            managed.agg_service = Some(Arc::clone(&agg));
1582
1583            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
1584            let pre_pipeline = Arc::clone(&split.pre_pipeline);
1585            let post_pipeline = Arc::clone(&split.post_pipeline);
1586
1587            // Spawn consumer task (same as normal route)
1588            let route_id_for_consumer = route_id.to_string();
1589            let consumer_handle = tokio::spawn(async move {
1590                if let Err(e) = consumer.start(consumer_ctx).await {
1591                    error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1592                    let error_msg = e.to_string();
1593                    if let Some(tx) = crash_notifier {
1594                        let _ = tx
1595                            .send(CrashNotification {
1596                                route_id: route_id_for_consumer.clone(),
1597                                error: error_msg.clone(),
1598                            })
1599                            .await;
1600                    }
1601                    publish_runtime_failure(
1602                        runtime_for_consumer,
1603                        &route_id_for_consumer,
1604                        &error_msg,
1605                    )
1606                    .await;
1607                }
1608            });
1609
1610            // Spawn biased select forward loop
1611            let pipeline_handle = tokio::spawn(async move {
1612                loop {
1613                    tokio::select! {
1614                        biased;
1615
1616                        late_ex = async {
1617                            let mut rx = late_rx.lock().await;
1618                            rx.recv().await
1619                        } => {
1620                            match late_ex {
1621                                Some(ex) => {
1622                                    let pipe = post_pipeline.load();
1623                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
1624                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
1625                                    }
1626                                }
1627                                None => return,
1628                            }
1629                        }
1630
1631                        envelope_opt = rx.recv() => {
1632                            match envelope_opt {
1633                                Some(envelope) => {
1634                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
1635                                    let pre_pipe = pre_pipeline.load();
1636                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
1637                                        Ok(ex) => ex,
1638                                        Err(e) => {
1639                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1640                                            continue;
1641                                        }
1642                                    };
1643
1644                                    let ex = {
1645                                        let cloned_svc = agg.lock().unwrap().clone();
1646                                        cloned_svc.oneshot(ex).await
1647                                    };
1648
1649                                    match ex {
1650                                        Ok(ex) => {
1651                                            if !is_pending(&ex) {
1652                                                let post_pipe = post_pipeline.load();
1653                                                let out = post_pipe.0.clone().oneshot(ex).await;
1654                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
1655                                            } else if let Some(tx) = reply_tx {
1656                                                let _ = tx.send(Ok(ex));
1657                                            }
1658                                        }
1659                                        Err(e) => {
1660                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
1661                                        }
1662                                    }
1663                                }
1664                                None => return,
1665                            }
1666                        }
1667
1668                        _ = pipeline_cancel.cancelled() => {
1669                            {
1670                                let guard = agg.lock().unwrap();
1671                                guard.force_complete_all();
1672                            }
1673                            let mut rx_guard = late_rx.lock().await;
1674                            while let Ok(late_ex) = rx_guard.try_recv() {
1675                                let pipe = post_pipeline.load();
1676                                let _ = pipe.0.clone().oneshot(late_ex).await;
1677                            }
1678                            break;
1679                        }
1680                    }
1681                }
1682            });
1683
1684            let managed = self
1685                .routes
1686                .get_mut(route_id)
1687                .expect("invariant: route must exist");
1688            managed.consumer_handle = Some(consumer_handle);
1689            managed.pipeline_handle = Some(pipeline_handle);
1690            managed.channel_sender = Some(tx_for_storage);
1691
1692            info!(route_id = %route_id, "Route started (aggregate with timeout)");
1693            return Ok(());
1694        }
1695        // --- End aggregator v2 branch ---
1696
1697        // Start consumer in background task.
1698        let route_id_for_consumer = route_id.to_string();
1699        let consumer_handle = tokio::spawn(async move {
1700            if let Err(e) = consumer.start(consumer_ctx).await {
1701                error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1702                let error_msg = e.to_string();
1703
1704                // Send crash notification if notifier is configured
1705                if let Some(tx) = crash_notifier {
1706                    let _ = tx
1707                        .send(CrashNotification {
1708                            route_id: route_id_for_consumer.clone(),
1709                            error: error_msg.clone(),
1710                        })
1711                        .await;
1712                }
1713
1714                publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1715                    .await;
1716            }
1717        });
1718
1719        // Spawn pipeline task with its own cancellation token
1720        let pipeline_handle = match effective_concurrency {
1721            ConcurrencyModel::Sequential => {
1722                tokio::spawn(async move {
1723                    loop {
1724                        // Use select! to exit promptly on cancellation even when idle
1725                        let envelope = tokio::select! {
1726                            envelope = rx.recv() => match envelope {
1727                                Some(e) => e,
1728                                None => return, // Channel closed
1729                            },
1730                            _ = pipeline_cancel.cancelled() => {
1731                                // Cancellation requested - exit gracefully
1732                                return;
1733                            }
1734                        };
1735                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1736
1737                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1738                        let mut pipeline = pipeline.load().0.clone();
1739
1740                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1741                            if let Some(tx) = reply_tx {
1742                                let _ = tx.send(Err(e));
1743                            }
1744                            return;
1745                        }
1746
1747                        let result = pipeline.call(exchange).await;
1748                        if let Some(tx) = reply_tx {
1749                            let _ = tx.send(result);
1750                        } else if let Err(ref e) = result
1751                            && !matches!(e, CamelError::Stopped)
1752                        {
1753                            error!("Pipeline error: {e}");
1754                        }
1755                    }
1756                })
1757            }
1758            ConcurrencyModel::Concurrent { max } => {
1759                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1760                tokio::spawn(async move {
1761                    loop {
1762                        // Use select! to exit promptly on cancellation even when idle
1763                        let envelope = tokio::select! {
1764                            envelope = rx.recv() => match envelope {
1765                                Some(e) => e,
1766                                None => return, // Channel closed
1767                            },
1768                            _ = pipeline_cancel.cancelled() => {
1769                                // Cancellation requested - exit gracefully
1770                                return;
1771                            }
1772                        };
1773                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1774                        let pipe_ref = Arc::clone(&pipeline);
1775                        let sem = sem.clone();
1776                        let cancel = pipeline_cancel.clone();
1777                        tokio::spawn(async move {
1778                            // Acquire semaphore permit if bounded
1779                            let _permit = match &sem {
1780                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1781                                None => None,
1782                            };
1783
1784                            // Load current pipeline from ArcSwap
1785                            let mut pipe = pipe_ref.load().0.clone();
1786
1787                            // Wait for service ready with circuit breaker backoff
1788                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1789                                if let Some(tx) = reply_tx {
1790                                    let _ = tx.send(Err(e));
1791                                }
1792                                return;
1793                            }
1794
1795                            let result = pipe.call(exchange).await;
1796                            if let Some(tx) = reply_tx {
1797                                let _ = tx.send(result);
1798                            } else if let Err(ref e) = result
1799                                && !matches!(e, CamelError::Stopped)
1800                            {
1801                                error!("Pipeline error: {e}");
1802                            }
1803                        });
1804                    }
1805                })
1806            }
1807        };
1808
1809        // Store handles and update status
1810        let managed = self
1811            .routes
1812            .get_mut(route_id)
1813            .expect("invariant: route must exist after prior existence check");
1814        managed.consumer_handle = Some(consumer_handle);
1815        managed.pipeline_handle = Some(pipeline_handle);
1816        managed.channel_sender = Some(tx_for_storage);
1817
1818        info!(route_id = %route_id, "Route started");
1819        Ok(())
1820    }
1821
1822    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1823        self.stop_route_internal(route_id).await
1824    }
1825
1826    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1827        self.stop_route(route_id).await?;
1828        tokio::time::sleep(Duration::from_millis(100)).await;
1829        self.start_route(route_id).await
1830    }
1831
1832    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1833        // Check route exists and state.
1834        let managed = self
1835            .routes
1836            .get_mut(route_id)
1837            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1838
1839        let consumer_running = handle_is_running(&managed.consumer_handle);
1840        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1841
1842        // Can only suspend from active started state.
1843        if !consumer_running || !pipeline_running {
1844            return Err(CamelError::RouteError(format!(
1845                "Cannot suspend route '{}' with execution lifecycle {}",
1846                route_id,
1847                inferred_lifecycle_label(managed)
1848            )));
1849        }
1850
1851        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1852
1853        // Cancel consumer token only (keep pipeline running)
1854        let managed = self
1855            .routes
1856            .get_mut(route_id)
1857            .expect("invariant: route must exist after prior existence check");
1858        managed.consumer_cancel_token.cancel();
1859
1860        // Take and join consumer handle
1861        let managed = self
1862            .routes
1863            .get_mut(route_id)
1864            .expect("invariant: route must exist after prior existence check");
1865        let consumer_handle = managed.consumer_handle.take();
1866
1867        // Wait for consumer task to complete with timeout
1868        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1869            if let Some(handle) = consumer_handle {
1870                let _ = handle.await;
1871            }
1872        })
1873        .await;
1874
1875        if timeout_result.is_err() {
1876            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1877        }
1878
1879        // Get the managed route again (can't hold across await)
1880        let managed = self
1881            .routes
1882            .get_mut(route_id)
1883            .expect("invariant: route must exist after prior existence check");
1884
1885        // Create fresh cancellation token for consumer (for resume)
1886        managed.consumer_cancel_token = CancellationToken::new();
1887
1888        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1889        Ok(())
1890    }
1891
1892    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1893        // Check route exists and is Suspended-equivalent execution state.
1894        let managed = self
1895            .routes
1896            .get(route_id)
1897            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1898
1899        let consumer_running = handle_is_running(&managed.consumer_handle);
1900        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1901        if consumer_running || !pipeline_running {
1902            return Err(CamelError::RouteError(format!(
1903                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1904                route_id,
1905                inferred_lifecycle_label(managed)
1906            )));
1907        }
1908
1909        // Get the stored channel sender (must exist for a suspended route)
1910        let sender = managed.channel_sender.clone().ok_or_else(|| {
1911            CamelError::RouteError("Suspended route has no channel sender".into())
1912        })?;
1913
1914        // Get from_uri and concurrency for creating new consumer
1915        let from_uri = managed.from_uri.clone();
1916
1917        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1918
1919        // Parse from URI and create consumer (lock registry for lookup)
1920        let parsed = parse_uri(&from_uri)?;
1921        let registry = self
1922            .registry
1923            .lock()
1924            .expect("mutex poisoned: another thread panicked while holding this lock");
1925        let component = registry.get_or_err(&parsed.scheme)?;
1926        let endpoint = component.create_endpoint(&from_uri)?;
1927        let mut consumer = endpoint.create_consumer()?;
1928        // Drop the lock before spawning tasks
1929        drop(registry);
1930
1931        // Get the managed route for mutation
1932        let managed = self
1933            .routes
1934            .get_mut(route_id)
1935            .expect("invariant: route must exist after prior existence check");
1936
1937        // Create child token for consumer lifecycle
1938        let consumer_cancel = managed.consumer_cancel_token.child_token();
1939
1940        let crash_notifier = self.crash_notifier.clone();
1941        let runtime_for_consumer = self.runtime.clone();
1942
1943        // Create ConsumerContext with the stored sender
1944        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1945
1946        // Spawn consumer task
1947        let route_id_for_consumer = route_id.to_string();
1948        let consumer_handle = tokio::spawn(async move {
1949            if let Err(e) = consumer.start(consumer_ctx).await {
1950                error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1951                let error_msg = e.to_string();
1952
1953                // Send crash notification if notifier is configured
1954                if let Some(tx) = crash_notifier {
1955                    let _ = tx
1956                        .send(CrashNotification {
1957                            route_id: route_id_for_consumer.clone(),
1958                            error: error_msg.clone(),
1959                        })
1960                        .await;
1961                }
1962
1963                publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1964                    .await;
1965            }
1966        });
1967
1968        // Store consumer handle and update status
1969        let managed = self
1970            .routes
1971            .get_mut(route_id)
1972            .expect("invariant: route must exist after prior existence check");
1973        managed.consumer_handle = Some(consumer_handle);
1974
1975        info!(route_id = %route_id, "Route resumed");
1976        Ok(())
1977    }
1978
1979    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1980        // Only start routes where auto_startup() == true
1981        // Sort by startup_order() ascending before starting
1982        let route_ids: Vec<String> = {
1983            let mut pairs: Vec<_> = self
1984                .routes
1985                .iter()
1986                .filter(|(_, r)| r.definition.auto_startup())
1987                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1988                .collect();
1989            pairs.sort_by_key(|(_, order)| *order);
1990            pairs.into_iter().map(|(id, _)| id).collect()
1991        };
1992
1993        info!("Starting {} auto-startup routes", route_ids.len());
1994
1995        // Collect errors but continue starting remaining routes
1996        let mut errors: Vec<String> = Vec::new();
1997        for route_id in route_ids {
1998            if let Err(e) = self.start_route(&route_id).await {
1999                errors.push(format!("Route '{}': {}", route_id, e));
2000            }
2001        }
2002
2003        if !errors.is_empty() {
2004            return Err(CamelError::RouteError(format!(
2005                "Failed to start routes: {}",
2006                errors.join(", ")
2007            )));
2008        }
2009
2010        info!("All auto-startup routes started");
2011        Ok(())
2012    }
2013
2014    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
2015        // Sort by startup_order descending (reverse order)
2016        let route_ids: Vec<String> = {
2017            let mut pairs: Vec<_> = self
2018                .routes
2019                .iter()
2020                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
2021                .collect();
2022            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
2023            pairs.into_iter().map(|(id, _)| id).collect()
2024        };
2025
2026        info!("Stopping {} routes", route_ids.len());
2027
2028        for route_id in route_ids {
2029            let _ = self.stop_route(&route_id).await;
2030        }
2031
2032        info!("All routes stopped");
2033        Ok(())
2034    }
2035}
2036
2037#[async_trait::async_trait]
2038impl RouteControllerInternal for DefaultRouteController {
2039    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
2040        DefaultRouteController::add_route(self, def)
2041    }
2042
2043    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
2044        DefaultRouteController::swap_pipeline(self, route_id, pipeline)
2045    }
2046
2047    fn route_from_uri(&self, route_id: &str) -> Option<String> {
2048        // Call the inherent method which now returns Option<String>
2049        DefaultRouteController::route_from_uri(self, route_id)
2050    }
2051
2052    fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
2053        DefaultRouteController::set_error_handler(self, config)
2054    }
2055
2056    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
2057        DefaultRouteController::set_self_ref(self, self_ref)
2058    }
2059
2060    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
2061        DefaultRouteController::set_runtime_handle(self, runtime)
2062    }
2063
2064    fn route_count(&self) -> usize {
2065        DefaultRouteController::route_count(self)
2066    }
2067
2068    fn in_flight_count(&self, route_id: &str) -> Option<u64> {
2069        DefaultRouteController::in_flight_count(self, route_id)
2070    }
2071
2072    fn route_exists(&self, route_id: &str) -> bool {
2073        DefaultRouteController::route_exists(self, route_id)
2074    }
2075
2076    fn route_ids(&self) -> Vec<String> {
2077        DefaultRouteController::route_ids(self)
2078    }
2079
2080    fn auto_startup_route_ids(&self) -> Vec<String> {
2081        DefaultRouteController::auto_startup_route_ids(self)
2082    }
2083
2084    fn shutdown_route_ids(&self) -> Vec<String> {
2085        DefaultRouteController::shutdown_route_ids(self)
2086    }
2087
2088    fn set_tracer_config(&mut self, config: &TracerConfig) {
2089        DefaultRouteController::set_tracer_config(self, config)
2090    }
2091
2092    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
2093        DefaultRouteController::compile_route_definition(self, def)
2094    }
2095
2096    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
2097        DefaultRouteController::remove_route(self, route_id)
2098    }
2099
2100    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
2101        DefaultRouteController::start_route(self, route_id).await
2102    }
2103
2104    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
2105        DefaultRouteController::stop_route(self, route_id).await
2106    }
2107}
2108
2109#[cfg(test)]
2110mod tests {
2111    use super::*;
2112    use crate::shared::components::domain::Registry;
2113
2114    fn build_controller() -> DefaultRouteController {
2115        DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())))
2116    }
2117
2118    fn build_controller_with_components() -> DefaultRouteController {
2119        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2120        {
2121            let mut guard = registry.lock().expect("registry lock");
2122            guard.register(camel_component_timer::TimerComponent::new());
2123            guard.register(camel_component_mock::MockComponent::new());
2124            guard.register(camel_component_log::LogComponent::new());
2125        }
2126        DefaultRouteController::new(registry)
2127    }
2128
2129    fn set_self_ref(controller: &mut DefaultRouteController) {
2130        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2131        let other: Arc<Mutex<dyn RouteController>> =
2132            Arc::new(Mutex::new(DefaultRouteController::new(registry)));
2133        controller.set_self_ref(other);
2134    }
2135
2136    fn register_simple_language(controller: &mut DefaultRouteController) {
2137        controller.languages.lock().expect("languages lock").insert(
2138            "simple".into(),
2139            Arc::new(camel_language_simple::SimpleLanguage),
2140        );
2141    }
2142
2143    #[test]
2144    fn test_route_controller_internal_is_object_safe() {
2145        let _: Option<Box<dyn RouteControllerInternal>> = None;
2146    }
2147
2148    #[test]
2149    fn helper_functions_cover_non_async_branches() {
2150        let managed = ManagedRoute {
2151            definition: RouteDefinition::new("timer:a", vec![])
2152                .with_route_id("r")
2153                .to_info(),
2154            from_uri: "timer:a".into(),
2155            pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(BoxProcessor::new(
2156                IdentityProcessor,
2157            )))),
2158            concurrency: None,
2159            consumer_handle: None,
2160            pipeline_handle: None,
2161            consumer_cancel_token: CancellationToken::new(),
2162            pipeline_cancel_token: CancellationToken::new(),
2163            channel_sender: None,
2164            in_flight: None,
2165            aggregate_split: None,
2166            agg_service: None,
2167        };
2168
2169        assert_eq!(inferred_lifecycle_label(&managed), "Stopped");
2170        assert!(!handle_is_running(&managed.consumer_handle));
2171
2172        let cmd = runtime_failure_command("route-x", "boom");
2173        match cmd {
2174            RuntimeCommand::FailRoute {
2175                route_id, error, ..
2176            } => {
2177                assert_eq!(route_id, "route-x");
2178                assert_eq!(error, "boom");
2179            }
2180            _ => panic!("expected FailRoute command"),
2181        }
2182    }
2183
2184    #[test]
2185    fn add_route_detects_duplicates() {
2186        let mut controller = build_controller();
2187        set_self_ref(&mut controller);
2188
2189        controller
2190            .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
2191            .expect("add route");
2192
2193        let dup_err = controller
2194            .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
2195            .expect_err("duplicate must fail");
2196        assert!(dup_err.to_string().contains("already exists"));
2197    }
2198
2199    #[test]
2200    fn route_introspection_and_ordering_helpers_work() {
2201        let mut controller = build_controller();
2202        set_self_ref(&mut controller);
2203
2204        controller
2205            .add_route(
2206                RouteDefinition::new("timer:a", vec![])
2207                    .with_route_id("a")
2208                    .with_startup_order(20),
2209            )
2210            .unwrap();
2211        controller
2212            .add_route(
2213                RouteDefinition::new("timer:b", vec![])
2214                    .with_route_id("b")
2215                    .with_startup_order(10),
2216            )
2217            .unwrap();
2218        controller
2219            .add_route(
2220                RouteDefinition::new("timer:c", vec![])
2221                    .with_route_id("c")
2222                    .with_auto_startup(false)
2223                    .with_startup_order(5),
2224            )
2225            .unwrap();
2226
2227        assert_eq!(controller.route_count(), 3);
2228        assert_eq!(controller.route_from_uri("a"), Some("timer:a".into()));
2229        assert!(controller.route_ids().contains(&"a".to_string()));
2230        assert_eq!(
2231            controller.auto_startup_route_ids(),
2232            vec!["b".to_string(), "a".to_string()]
2233        );
2234        assert_eq!(
2235            controller.shutdown_route_ids(),
2236            vec!["a".to_string(), "b".to_string(), "c".to_string()]
2237        );
2238    }
2239
2240    #[test]
2241    fn swap_pipeline_and_remove_route_behaviors() {
2242        let mut controller = build_controller();
2243        set_self_ref(&mut controller);
2244
2245        controller
2246            .add_route(RouteDefinition::new("timer:a", vec![]).with_route_id("swap"))
2247            .unwrap();
2248
2249        controller
2250            .swap_pipeline("swap", BoxProcessor::new(IdentityProcessor))
2251            .unwrap();
2252        assert!(controller.get_pipeline("swap").is_some());
2253
2254        controller.remove_route("swap").unwrap();
2255        assert_eq!(controller.route_count(), 0);
2256
2257        let err = controller
2258            .remove_route("swap")
2259            .expect_err("missing route must fail");
2260        assert!(err.to_string().contains("not found"));
2261    }
2262
2263    #[test]
2264    fn resolve_steps_covers_declarative_and_eip_variants() {
2265        use camel_api::LanguageExpressionDef;
2266        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2267
2268        let mut controller = build_controller_with_components();
2269        set_self_ref(&mut controller);
2270        register_simple_language(&mut controller);
2271
2272        let expr = |source: &str| LanguageExpressionDef {
2273            language: "simple".into(),
2274            source: source.into(),
2275        };
2276
2277        let steps = vec![
2278            BuilderStep::To("mock:out".into()),
2279            BuilderStep::Stop,
2280            BuilderStep::Log {
2281                level: camel_processor::LogLevel::Info,
2282                message: "log".into(),
2283            },
2284            BuilderStep::DeclarativeSetHeader {
2285                key: "k".into(),
2286                value: ValueSourceDef::Literal(Value::String("v".into())),
2287            },
2288            BuilderStep::DeclarativeSetHeader {
2289                key: "k2".into(),
2290                value: ValueSourceDef::Expression(expr("${body}")),
2291            },
2292            BuilderStep::DeclarativeSetBody {
2293                value: ValueSourceDef::Expression(expr("${body}")),
2294            },
2295            BuilderStep::DeclarativeFilter {
2296                predicate: expr("${body} != null"),
2297                steps: vec![BuilderStep::Stop],
2298            },
2299            BuilderStep::DeclarativeChoice {
2300                whens: vec![
2301                    crate::lifecycle::application::route_definition::DeclarativeWhenStep {
2302                        predicate: expr("${body} == 'x'"),
2303                        steps: vec![BuilderStep::Stop],
2304                    },
2305                ],
2306                otherwise: Some(vec![BuilderStep::Stop]),
2307            },
2308            BuilderStep::DeclarativeScript {
2309                expression: expr("${body}"),
2310            },
2311            BuilderStep::Split {
2312                config: SplitterConfig::new(split_body_lines())
2313                    .aggregation(AggregationStrategy::CollectAll),
2314                steps: vec![BuilderStep::Stop],
2315            },
2316            BuilderStep::DeclarativeSplit {
2317                expression: expr("${body}"),
2318                aggregation: AggregationStrategy::Original,
2319                parallel: false,
2320                parallel_limit: Some(2),
2321                stop_on_exception: true,
2322                steps: vec![BuilderStep::Stop],
2323            },
2324            BuilderStep::Aggregate {
2325                config: camel_api::AggregatorConfig::correlate_by("id")
2326                    .complete_when_size(1)
2327                    .build(),
2328            },
2329            BuilderStep::Filter {
2330                predicate: Arc::new(|_| true),
2331                steps: vec![BuilderStep::Stop],
2332            },
2333            BuilderStep::Choice {
2334                whens: vec![crate::lifecycle::application::route_definition::WhenStep {
2335                    predicate: Arc::new(|_| true),
2336                    steps: vec![BuilderStep::Stop],
2337                }],
2338                otherwise: Some(vec![BuilderStep::Stop]),
2339            },
2340            BuilderStep::WireTap {
2341                uri: "mock:tap".into(),
2342            },
2343            BuilderStep::Multicast {
2344                steps: vec![
2345                    BuilderStep::To("mock:m1".into()),
2346                    BuilderStep::To("mock:m2".into()),
2347                ],
2348                config: camel_api::MulticastConfig::new(),
2349            },
2350            BuilderStep::DeclarativeLog {
2351                level: camel_processor::LogLevel::Info,
2352                message: ValueSourceDef::Expression(expr("${body}")),
2353            },
2354            BuilderStep::Throttle {
2355                config: camel_api::ThrottlerConfig::new(10, Duration::from_millis(100)),
2356                steps: vec![BuilderStep::To("mock:t".into())],
2357            },
2358            BuilderStep::LoadBalance {
2359                config: camel_api::LoadBalancerConfig::round_robin(),
2360                steps: vec![
2361                    BuilderStep::To("mock:l1".into()),
2362                    BuilderStep::To("mock:l2".into()),
2363                ],
2364            },
2365            BuilderStep::DynamicRouter {
2366                config: camel_api::DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
2367            },
2368            BuilderStep::RoutingSlip {
2369                config: camel_api::RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
2370            },
2371        ];
2372
2373        let producer_ctx = ProducerContext::new();
2374        let resolved = controller
2375            .resolve_steps(steps, &producer_ctx, &controller.registry)
2376            .expect("resolve should succeed");
2377        assert!(!resolved.is_empty());
2378    }
2379
2380    #[test]
2381    fn resolve_steps_script_requires_mutating_language_support() {
2382        use camel_api::LanguageExpressionDef;
2383
2384        let mut controller = build_controller_with_components();
2385        set_self_ref(&mut controller);
2386        register_simple_language(&mut controller);
2387
2388        let steps = vec![BuilderStep::Script {
2389            language: "simple".into(),
2390            script: "${body}".into(),
2391        }];
2392
2393        let err = controller
2394            .resolve_steps(steps, &ProducerContext::new(), &controller.registry)
2395            .expect_err("simple script should fail for mutating expression");
2396        assert!(err.to_string().contains("does not support"));
2397
2398        let bean_missing = vec![BuilderStep::Bean {
2399            name: "unknown".into(),
2400            method: "run".into(),
2401        }];
2402        let bean_err = controller
2403            .resolve_steps(bean_missing, &ProducerContext::new(), &controller.registry)
2404            .expect_err("missing bean must fail");
2405        assert!(bean_err.to_string().contains("Bean not found"));
2406
2407        let bad_declarative = vec![BuilderStep::DeclarativeScript {
2408            expression: LanguageExpressionDef {
2409                language: "unknown".into(),
2410                source: "x".into(),
2411            },
2412        }];
2413        let lang_err = controller
2414            .resolve_steps(
2415                bad_declarative,
2416                &ProducerContext::new(),
2417                &controller.registry,
2418            )
2419            .expect_err("unknown language must fail");
2420        assert!(lang_err.to_string().contains("not registered"));
2421    }
2422
2423    #[tokio::test]
2424    async fn lifecycle_methods_report_missing_routes() {
2425        let mut controller = build_controller();
2426
2427        assert!(controller.start_route("missing").await.is_err());
2428        assert!(controller.stop_route("missing").await.is_err());
2429        assert!(controller.suspend_route("missing").await.is_err());
2430        assert!(controller.resume_route("missing").await.is_err());
2431    }
2432
2433    #[tokio::test]
2434    async fn start_stop_route_happy_path_with_timer_and_mock() {
2435        let mut controller = build_controller_with_components();
2436        set_self_ref(&mut controller);
2437
2438        let route = RouteDefinition::new(
2439            "timer:tick?period=10&repeatCount=1",
2440            vec![BuilderStep::To("mock:out".into())],
2441        )
2442        .with_route_id("rt-1");
2443        controller.add_route(route).unwrap();
2444
2445        controller.start_route("rt-1").await.unwrap();
2446        tokio::time::sleep(Duration::from_millis(40)).await;
2447        controller.stop_route("rt-1").await.unwrap();
2448
2449        controller.remove_route("rt-1").unwrap();
2450    }
2451
2452    #[tokio::test]
2453    async fn suspend_resume_and_restart_cover_execution_transitions() {
2454        let mut controller = build_controller_with_components();
2455        set_self_ref(&mut controller);
2456
2457        let route = RouteDefinition::new(
2458            "timer:tick?period=30",
2459            vec![BuilderStep::To("mock:out".into())],
2460        )
2461        .with_route_id("rt-2");
2462        controller.add_route(route).unwrap();
2463
2464        controller.start_route("rt-2").await.unwrap();
2465        controller.suspend_route("rt-2").await.unwrap();
2466        controller.resume_route("rt-2").await.unwrap();
2467        controller.restart_route("rt-2").await.unwrap();
2468        controller.stop_route("rt-2").await.unwrap();
2469    }
2470
2471    #[tokio::test]
2472    async fn remove_route_rejects_running_route() {
2473        let mut controller = build_controller_with_components();
2474        set_self_ref(&mut controller);
2475
2476        let route = RouteDefinition::new(
2477            "timer:tick?period=25",
2478            vec![BuilderStep::To("mock:out".into())],
2479        )
2480        .with_route_id("rt-running");
2481        controller.add_route(route).unwrap();
2482        controller.start_route("rt-running").await.unwrap();
2483
2484        let err = controller
2485            .remove_route("rt-running")
2486            .expect_err("running route removal must fail");
2487        assert!(err.to_string().contains("must be stopped before removal"));
2488
2489        controller.stop_route("rt-running").await.unwrap();
2490        controller.remove_route("rt-running").unwrap();
2491    }
2492
2493    #[tokio::test]
2494    async fn start_route_on_suspended_state_returns_guidance_error() {
2495        let mut controller = build_controller_with_components();
2496        set_self_ref(&mut controller);
2497
2498        let route = RouteDefinition::new(
2499            "timer:tick?period=40",
2500            vec![BuilderStep::To("mock:out".into())],
2501        )
2502        .with_route_id("rt-suspend");
2503        controller.add_route(route).unwrap();
2504
2505        controller.start_route("rt-suspend").await.unwrap();
2506        controller.suspend_route("rt-suspend").await.unwrap();
2507
2508        let err = controller
2509            .start_route("rt-suspend")
2510            .await
2511            .expect_err("start from suspended must fail");
2512        assert!(err.to_string().contains("use resume_route"));
2513
2514        controller.resume_route("rt-suspend").await.unwrap();
2515        controller.stop_route("rt-suspend").await.unwrap();
2516    }
2517
2518    #[tokio::test]
2519    async fn suspend_and_resume_validate_execution_state() {
2520        let mut controller = build_controller_with_components();
2521        set_self_ref(&mut controller);
2522
2523        controller
2524            .add_route(
2525                RouteDefinition::new("timer:tick?period=50", vec![]).with_route_id("rt-state"),
2526            )
2527            .unwrap();
2528
2529        let suspend_err = controller
2530            .suspend_route("rt-state")
2531            .await
2532            .expect_err("suspend before start must fail");
2533        assert!(suspend_err.to_string().contains("Cannot suspend route"));
2534
2535        controller.start_route("rt-state").await.unwrap();
2536        let resume_err = controller
2537            .resume_route("rt-state")
2538            .await
2539            .expect_err("resume while started must fail");
2540        assert!(resume_err.to_string().contains("Cannot resume route"));
2541
2542        controller.stop_route("rt-state").await.unwrap();
2543    }
2544
2545    #[tokio::test]
2546    async fn concurrent_concurrency_override_path_executes() {
2547        let mut controller = build_controller_with_components();
2548        set_self_ref(&mut controller);
2549
2550        let route = RouteDefinition::new(
2551            "timer:tick?period=10&repeatCount=2",
2552            vec![BuilderStep::To("mock:out".into())],
2553        )
2554        .with_route_id("rt-concurrent")
2555        .with_concurrency(ConcurrencyModel::Concurrent { max: Some(2) });
2556
2557        controller.add_route(route).unwrap();
2558        controller.start_route("rt-concurrent").await.unwrap();
2559        tokio::time::sleep(Duration::from_millis(50)).await;
2560        controller.stop_route("rt-concurrent").await.unwrap();
2561    }
2562
2563    #[tokio::test]
2564    async fn add_route_with_circuit_breaker_and_error_handler_compiles() {
2565        use camel_api::circuit_breaker::CircuitBreakerConfig;
2566        use camel_api::error_handler::ErrorHandlerConfig;
2567
2568        let mut controller = build_controller_with_components();
2569        set_self_ref(&mut controller);
2570
2571        let route = RouteDefinition::new("timer:tick?period=25", vec![BuilderStep::Stop])
2572            .with_route_id("rt-eh")
2573            .with_circuit_breaker(CircuitBreakerConfig::new())
2574            .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"));
2575
2576        controller
2577            .add_route(route)
2578            .expect("route with layers should compile");
2579        controller.start_route("rt-eh").await.unwrap();
2580        controller.stop_route("rt-eh").await.unwrap();
2581    }
2582
2583    #[tokio::test]
2584    async fn compile_and_swap_errors_for_missing_route() {
2585        let mut controller = build_controller_with_components();
2586        set_self_ref(&mut controller);
2587
2588        let compiled = controller
2589            .compile_route_definition(
2590                RouteDefinition::new("timer:tick?period=10", vec![BuilderStep::Stop])
2591                    .with_route_id("compiled"),
2592            )
2593            .expect("compile should work");
2594
2595        let err = controller
2596            .swap_pipeline("nope", compiled)
2597            .expect_err("missing route swap must fail");
2598        assert!(err.to_string().contains("not found"));
2599    }
2600}